aboutsummaryrefslogtreecommitdiffstats
path: root/services/database
diff options
context:
space:
mode:
Diffstat (limited to 'services/database')
-rw-r--r--services/database/__init__.py38
-rw-r--r--services/database/db_bridge.py65
-rw-r--r--services/database/db_checklist.py526
-rw-r--r--services/database/db_cms.py278
-rwxr-xr-xservices/database/db_general.py485
-rw-r--r--services/database/db_user.py515
-rw-r--r--services/database/db_virtual_function.py286
7 files changed, 0 insertions, 2193 deletions
diff --git a/services/database/__init__.py b/services/database/__init__.py
deleted file mode 100644
index 32b601a..0000000
--- a/services/database/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py
deleted file mode 100644
index 1eb79fa..0000000
--- a/services/database/db_bridge.py
+++ /dev/null
@@ -1,65 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-
-
-class DBBridge:
-
- """
- This class helps to use functions inside classes
- with circular import (dependencies).
- Use this class only when there is circular
- import in one of the DB services.
- """
-
- @staticmethod
- def select_personal_next_step(user_email):
- """select_personal_next_step: Originally """ +\
- """can be found under DBUser class."""
- from services.database.db_user import DBUser
- return DBUser.select_personal_next_step(user_email)
-
- @staticmethod
- def helper_rand_string(type, num=""):
- from services.helper import Helper
- return Helper.rand_string(type, num)
-
- @staticmethod
- def helper_internal_assert(arg1, arg2):
- from services.helper import Helper
- return Helper.internal_assert(arg1, arg2)
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
deleted file mode 100644
index 0f8fd6e..0000000
--- a/services/database/db_checklist.py
+++ /dev/null
@@ -1,526 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import time
-from uuid import uuid4
-
-from django.utils import timezone
-import psycopg2
-
-from services.constants import Constants
-from services.database.db_general import DBGeneral
-from services.logging_service import LoggingServiceFactory
-from services.session import session
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBChecklist:
-
- @staticmethod
- def select_where_approval_state(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName) +\
- "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
- " and state = 'approval';"
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- if result is None:
- errorMsg = "select_where_approval_state FAILED "
- logger.error(errorMsg)
- raise
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_approval_state FAILED "
- raise Exception(errorMsg, "select_where_approval_state FAILED")
-
- @staticmethod
- def select_where_pr_state(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName) +\
- "Where %s = '%s' and " % (
- whereParametrType, whereParametrValue) +\
- "state = 'peer_review';"
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- if result is None:
- errorMsg = "select_where_pr_state FAILED "
- logger.error(errorMsg)
- raise
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_where_cl_not_archive(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName) +\
- "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
- "and state != 'archive';"
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_native_where(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def update_checklist_to_review_state(queryTableName):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist SET state='review' Where " +\
- "name= '%s' and state= 'pending';" % (
- queryTableName)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "Could not Update User"
- raise Exception(errorMsg, "Update")
-
- @staticmethod
- def update_all_decisions_to_approve(whereParametrValue):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist_decision SET " +\
- "review_value='approved' , peer_review_value='approved' " +\
- "Where checklist_id = '%s';" % (
- whereParametrValue)
- logger.debug(queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- logger.debug("Query : " + queryStr)
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "Could not Update User"
- logger.debug(e)
- raise Exception(errorMsg, "Update")
- finally:
- dbConn.close()
-
- @staticmethod
- def is_archive(checklistName):
- try:
- result = False
- # Fetch all AT&T user ID.
- checklist_ids = DBGeneral.select_where(
- "uuid", "ice_checklist", "name", checklistName, 0)
-# checklist_ids = DBGeneral.list_format(checklist_ids)
- for checklist_id in checklist_ids: # Second Example
- if isinstance(checklist_id, tuple):
- checklist_id = checklist_id[0]
- state = DBGeneral.select_where(
- "state", "ice_checklist", "uuid", checklist_id, 1)
- if state == "archive":
- result = True
- break
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "is_archive FAILED "
- raise Exception(errorMsg, "is_archive")
-
- @staticmethod
- def get_pr_email(checklistUuid):
- try:
- # Fetch one AT&T user ID.
- owner_id = DBChecklist.select_where_pr_state(
- "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
- engLeadEmail = DBGeneral.select_where(
- "email", "ice_user_profile", "id", owner_id, 1)
- logger.debug("get_pr_email = " + engLeadEmail)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "get_pr_email FAILED " + str(e)
- raise Exception(errorMsg, "get_pr_email")
-
- @staticmethod
- def get_admin_email(checklistUuid):
- try:
- # Fetch one AT&T user ID.
- owner_id = DBChecklist.select_where_approval_state(
- "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
- engLeadEmail = DBGeneral.select_where(
- "email", "ice_user_profile", "id", owner_id, 1)
- logger.debug("get_admin_email = " + engLeadEmail)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "get_admin_email FAILED "
- raise Exception(errorMsg, "get_admin_email")
-
- @staticmethod
- def get_owner_email(checklistUuid):
- try:
- # Fetch one AT&T user ID.
- owner_id = DBChecklist.select_native_where(
- "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
- engLeadEmail = DBGeneral.select_where(
- "email", "ice_user_profile", "id", owner_id, 1)
- logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "get_admin_email FAILED "
- raise Exception(errorMsg, "get_owner_email")
-
- @staticmethod
- def update_decisions(checklistUuid, checklistName):
- checklistTempid = DBGeneral.select_where(
- "template_id", "ice_checklist", "name", checklistName, 1)
- checklistLineItems = DBGeneral.select_where_and(
- "uuid",
- "ice_checklist_line_item",
- "line_type",
- "auto",
- "template_id",
- checklistTempid,
- 0)
- for lineItem in checklistLineItems:
- setParametrType2 = "peer_review_value"
- setParametrValue2 = "approved"
- whereParametrType2 = "lineitem_id"
- whereParametrValue2 = lineItem
- DBGeneral.update_where_and(
- "ice_checklist_decision",
- "review_value",
- checklistUuid,
- "approved",
- "checklist_id",
- setParametrType2,
- setParametrValue2,
- whereParametrType2,
- whereParametrValue2)
-
- @staticmethod
- def checkChecklistIsUpdated():
- query = "select uuid from ice_checklist_section where template_id " +\
- "in (select template_id from ice_checklist_template where " +\
- "name='{template_name}') and name='{section_name}'".format(
- template_name=Constants.Dashboard.LeftPanel.
- EditChecklistTemplate.HEAT, section_name=Constants.
- Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
- return DBGeneral.select_query(query)
-
- @staticmethod
- def fetchEngByVfName(vfName):
- # Fetch one AT&T user ID.
- return DBGeneral.select_where(
- "engagement_id", "ice_vf", "name", vfName, 1)
-
- @staticmethod
- def fetchEngManIdByEngUuid(engagement_id):
- return DBGeneral.select_where(
- "engagement_manual_id",
- "ice_engagement",
- "uuid",
- engagement_id,
- 1)
-
- @staticmethod
- def fetchChecklistByName(checklistName):
- query = "select uuid from ice_checklist where " +\
- "name='{cl_name}'".format(
- cl_name=checklistName)
- return DBGeneral.select_query(query)
-
- @staticmethod
- def create_default_heat_teampleate():
- template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\
- "name, category, version, create_time)"\
- "VALUES ('%s', '%s', '%s', '%s', '%s');" % (
- str(uuid4()), 'Editing Heat', 'first category', '1',
- timezone.now())
- DBGeneral.insert_query(template_query)
- template_id = DBGeneral.select_query(
- "SELECT uuid FROM public.ice_checklist_template where " +
- "name = 'Editing Heat'")
- # SECTIONS
- section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
- "name, weight, description, validation_instructions, " +\
- "create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
- str(uuid4()), 'External References',
- '1', 'section descripyion', 'valid instructions',
- timezone.now(), template_id)
- DBGeneral.insert_query(section1_query)
- section1_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section """ +
- """where name = 'External References' """ +
- """and template_id = '{s}'""").format(
- s=template_id))
- section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
- "name, weight, description, validation_instructions, " +\
- "create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
- str(uuid4()), 'Parameter Specification',
- '2', 'section descripyion', 'valid instructions',
- timezone.now(), template_id)
- DBGeneral.insert_query(section2_query)
- section2_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section """ +
- """where name = """ +
- """'Parameter Specification' and template_id = '{s}'""").format(
- s=template_id))
- # Line items
- line_item1 = \
- "INSERT INTO public.ice_checklist_line_item(uuid, " +\
- "name, weight, description, line_type, validation_instructions," +\
- "create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\
- "'%s', " % '1' +\
- "'%s'," % 'Numeric parameters should include ' +\
- 'range and/or allowed values.' +\
- " '%s'," % 'manual', +\
- "'%s'" % 'Here are some useful tips ' +\
- 'for how to validate this item ' +\
- 'in the most awesome way:<br><br><ul><li>Here is my ' +\
- 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +\
- 'Here is my awesome tip 3</li></ul>' +\
- ", '%s'" % timezone.now() +\
- ", '%s'," % section1_id +\
- " '%s');" % template_id
- DBGeneral.insert_query(line_item1)
- line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
- "name, weight, description, line_type, validation_instructions," +\
- "create_time, section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (
- str(uuid4()), 'String parameters', '2',
- 'Numeric parameters should include range ' +
- 'and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item ' +
- 'in the most awesome way:<br><br><ul><li>Here is my ' +
- 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
- 'Here is my awesome tip 3</li></ul>', timezone.now(),
- section2_id, template_id)
- DBGeneral.insert_query(line_item2)
- line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
- "name, weight, description, line_type, validation_instructions," +\
- "create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', " +\
- "'%s', '%s', '%s');" % (
- str(uuid4()), 'Numeric parameters', '3',
- 'Numeric parameters should include range and/or ' +
- 'allowed values.', 'manual',
- 'Here are some useful tips for how to validate this item ' +
- 'in the most awesome way:<br><br><ul><li>Here is my ' +
- 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
- 'Here is my awesome tip 3</li></ul>', timezone.now(),
- section2_id, template_id)
- DBGeneral.insert_query(line_item3)
- line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
- "name, weight, description, line_type, " +\
- "validation_instructions,create_time, section_id, " +\
- "template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', " +\
- "'%s', '%s');" % (
- str(uuid4()), 'VF image', '2',
- 'Numeric parameters should include range and/or ' +
- 'allowed values.', 'auto',
- 'Here are some useful tips for how to validate this ' +
- 'item in the most awesome way:<br><br><ul><li>Here is ' +
- 'my awesome tip 1</li><li>Here is my awesome tip 2' +
- '</li><li>Here is my awesome tip 3</li></ul>',
- timezone.now(), section1_id, template_id)
- DBGeneral.insert_query(line_item4)
- line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
- "name, weight, description, line_type, validation_instructions," +\
- "create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s'," +\
- " '%s', '%s');" % (str(
- uuid4()), 'Parameters', '1',
- 'Numeric parameters should include range ' +
- 'and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item ' +
- 'in the most awesome way:<br><br><ul><li>Here is my awesome ' +
- 'tip 1</li><li>Here is my awesome tip 2</li><li>Here is my ' +
- 'awesome tip 3</li></ul>', timezone.now(), section2_id,
- template_id)
- DBGeneral.insert_query(line_item5)
-
- @staticmethod
- def create_editing_cl_template_if_not_exist():
- template_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_template """ +
- """where name = '{s}'""").format(
- s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
- if template_id == 'None':
- DBChecklist.create_default_heat_teampleate()
- session.createTemplatecount = True
-
- @staticmethod
- def state_changed(identify_field, field_value, expected_state):
- get_state = DBGeneral.select_where_order_by_desc(
- "state", Constants.DBConstants.IceTables.CHECKLIST,
- identify_field, field_value, "create_time")[0]
- counter = 0
- while get_state != expected_state and \
- counter <= Constants.DBConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause_long)
- logger.debug(
- "Checklist state not changed yet ," +
- "expecting state: %s, current result: %s (attempt %s of %s)" %
- (expected_state, get_state, counter,
- Constants.DBConstants.RETRIES_NUMBER))
- counter += 1
- get_state = DBGeneral.select_where_order_by_desc(
- "state", Constants.DBConstants.IceTables.CHECKLIST,
- identify_field, field_value, "create_time")[0]
-
- if get_state == expected_state:
- logger.debug("Checklist state was successfully changed into: " +
- expected_state + ", and was verified over the DB")
- return expected_state
- raise Exception(
- "Expected checklist state never arrived " +
- expected_state,
- get_state)
-
- @staticmethod
- def get_recent_checklist_uuid(name):
- required_uuid = DBGeneral.select_where_not_and_order_by_desc(
- 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name,
- 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
- return required_uuid
diff --git a/services/database/db_cms.py b/services/database/db_cms.py
deleted file mode 100644
index 288121a..0000000
--- a/services/database/db_cms.py
+++ /dev/null
@@ -1,278 +0,0 @@
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import psycopg2
-from wheel.signatures import assertTrue
-
-from services.database.db_general import DBGeneral
-from services.helper import Helper
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBCMS:
-
- @staticmethod
- def insert_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('cms_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Insert query success!")
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def update_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('cms_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Update query success!")
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def select_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('cms_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def get_cms_category_id(categoryName):
- logger.debug("Get DBCMS category id for name: " + categoryName)
- queryStr = "SELECT id FROM public.blog_blogcategory WHERE " +\
- "title = '%s' LIMIT 1;" % (categoryName)
- logger.debug("Query : " + queryStr)
- result = DBCMS.select_query(queryStr)
- return result
-
- @staticmethod
- def insert_cms_new_post(title, description, categoryName):
- logger.debug("Insert new post : " + title)
- queryStr = "INSERT INTO public.blog_blogpost" \
- "(comments_count, keywords_string, rating_count, rating_sum, " +\
- "rating_average, title, slug, _meta_title, description, " +\
- "gen_description, created, updated, status, publish_date, " +\
- "expiry_date, short_url, in_sitemap, content, allow_comments, " +\
- "featured_image, site_id, user_id) "\
- "VALUES (0, '', 0, 0, 0, " +\
- "'%s', '%s-slug', " % (title, title) +\
- "'', '%s', true, " % description +\
- "current_timestamp - interval '1 day', current_timestamp - " +\
- "interval '2 day', 2, current_timestamp - interval '1 day', " +\
- "NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % description
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
- post_id = DBCMS.get_last_added_post_id()
- categoryId = DBCMS.get_cms_category_id(categoryName)
- DBCMS.add_category_to_post(post_id, categoryId)
- return post_id
-
- @staticmethod
- def get_last_added_post_id():
- logger.debug("Get the id of the post inserted")
- queryStr = "select MAX(id) FROM public.blog_blogpost;"
- logger.debug("Query : " + queryStr)
- result = DBCMS.select_query(queryStr)
- return result
-
- @staticmethod
- def update_days(xdays, title):
- logger.debug("Get the id of the post inserted")
- queryStr = "UPDATE public.blog_blogpost SET " +\
- "created=current_timestamp - interval '%s day' " % xdays +\
- "WHERE title='%s';" % title
- logger.debug("Query : " + queryStr)
- result = DBCMS.update_query(queryStr)
- return result
-
- @staticmethod
- def add_category_to_post(postId, categoryId):
- logger.debug("bind category into inserted post: " + postId)
- queryStr = "INSERT INTO public.blog_blogpost_categories" +\
- "(blogpost_id, blogcategory_id) " +\
- "VALUES (%s, %s);" % (postId, categoryId)
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
-
- @staticmethod
- def get_documentation_page_id():
- logger.debug("Retrive id of documentation page: ")
- queryStr = "SELECT id FROM public.pages_page WHERE " +\
- "title = 'Documentation' LIMIT 1;"
- logger.debug("Query : " + queryStr)
- result = DBCMS.select_query(queryStr)
- return result
-
- @staticmethod
- def get_last_inserted_page_id():
- logger.debug("Retrive id of last page inserted: ")
- queryStr = "select MAX(id) FROM public.pages_page;"
- logger.debug("Query : " + queryStr)
- result = DBCMS.select_query(queryStr)
- return result
-
- @staticmethod
- def delete_old_tips_of_the_day():
- logger.debug("Delete all posts ")
- queryStr = "DELETE FROM public.blog_blogpost_categories WHERE id>0;"
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
- queryStr = "DELETE FROM public.blog_blogpost WHERE id>0;;"
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
-
- @staticmethod
- def insert_page(title, content, parent_id=None):
- logger.debug("Retrive id of documentation page: ")
- if parent_id is None:
- parent_id = DBCMS.get_documentation_page_id()
- queryStr = "INSERT INTO public.pages_page(" \
- "keywords_string, title, slug, _meta_title, description, " +\
- "gen_description, created, updated, status, publish_date, " +\
- "expiry_date, short_url, in_sitemap, _order, in_menus, titles, " +\
- "content_model, login_required, parent_id, site_id)" \
- "VALUES ('', " +\
- "'%s', '%s-slug'" % (title, title) +\
- ", '', '%s', true, " % content +\
- "current_timestamp - interval '1 day', current_timestamp " +\
- "- interval '1 day', 2, current_timestamp - interval '1 day', " +\
- "NULL, '', true, 0, '1,2,3', " +\
- "'%s', 'richtextpage', " % title +\
- "true, %s, 1);" % parent_id
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
-
- createdPageId = DBCMS.get_last_inserted_page_id()
- logger.debug(
- "Bind the page with the rich text content related to this page")
- queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, " +\
- "content) VALUES (%s, '<p>%s</p>');" % (
- createdPageId, content)
- logger.debug("Query : " + queryStr)
- DBCMS.insert_query(queryStr)
- return createdPageId
-
- @staticmethod
- def create_faq():
- title = "title_FAQ" + Helper.rand_string("randomString")
- description = "description_FAQ_" + Helper.rand_string("randomString")
- DBCMS.delete_old_tips_of_the_day()
- postId = DBCMS.insert_cms_new_post(title, description, "FAQ")
- assertTrue(len(postId) > 0 and not None)
- return title, description
-
- @staticmethod
- def create_news():
- title = "title_News" + Helper.rand_string("randomString")
- description = "description_News" + Helper.rand_string("randomString")
- postId = DBCMS.insert_cms_new_post(title, description, "News")
- assertTrue(len(postId) > 0 and not None)
- return title, description
-
- @staticmethod
- def create_announcement():
- title = "title_Announcement_" + Helper.rand_string("randomString")
- description = "description_Announcement_" + \
- Helper.rand_string("randomString")
- postId = DBCMS.insert_cms_new_post(title, description, "Announcement")
- assertTrue(len(postId) > 0 and not None)
- return title, description
-
- @staticmethod
- def create_page(parent_id=None):
- title = "title_Of_Page_" + Helper.rand_string("randomString")
- description = "description_Of_Page_" + \
- Helper.rand_string("randomString")
- createdPageId = DBCMS.insert_page(title, description)
- assertTrue(len(createdPageId) > 0 and not None)
- return title, description
-
- @staticmethod
- def update_X_days_back_post(title, xdays):
- logger.debug("Get the id of the post inserted")
- queryStr = "UPDATE blog_blogpost SET created = current_timestamp" +\
- " - interval '%s day', " % xdays +\
- "publish_date=current_timestamp - " +\
- "interval '%s day' WHERE title= '%s' ;" % (xdays, title)
- logger.debug("Query : " + queryStr)
- DBCMS.update_query(queryStr)
-
- @staticmethod
- def create_announcements(x):
- listOfTitleAnDescriptions = []
- for _ in range(x):
- # print x ->str
- title = "title_Announcement_" + Helper.rand_string("randomString")
- description = "description_Announcement_" + \
- Helper.rand_string("randomString")
- postId = DBCMS.insert_cms_new_post(
- title, description, "Announcement")
- assertTrue(len(postId) > 0 and not None)
- xList = [title, description]
- listOfTitleAnDescriptions.append(xList)
- return listOfTitleAnDescriptions
diff --git a/services/database/db_general.py b/services/database/db_general.py
deleted file mode 100755
index 2c83fb0..0000000
--- a/services/database/db_general.py
+++ /dev/null
@@ -1,485 +0,0 @@
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from datetime import datetime
-import sqlite3
-
-from django.conf import settings
-from django.db import transaction
-import psycopg2
-
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBGeneral:
-
- @staticmethod
- # desigredDB: Use 'default' for CI General and 'em_db' for EM General
- # (according to settings.DATABASES).
- def return_db_native_connection(desigredDB):
- dbConnectionStr = "dbname='" + str(
- settings.SINGLETONE_DB[desigredDB]['NAME']) + \
- "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
- "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
- "' password='" + str(
- settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
- "' port='" + \
- str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
- return dbConnectionStr
-
- @staticmethod
- def insert_results(
- testType,
- testFeature,
- testResult,
- testName,
- testDuration,
- notes=" "):
- try:
- if settings.DATABASE_TYPE == 'sqlite':
- dbfile = str(settings.DATABASES['default']['TEST_NAME'])
- dbConn = sqlite3.connect(dbfile)
- cur = dbConn.cursor()
- else:
- # Connect to General 'default'.
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection("default"))
- dbConn = dbConn
- cur = dbConn.cursor()
- except Exception as e:
- errorMsg = "Failed to create connection to General." + str(e)
- raise Exception(errorMsg)
- try: # Create INSERT query.
- if settings.DATABASE_TYPE == 'sqlite':
- query_str = 'INSERT INTO ice_test_results ' +\
- '(testType, testFeature, testResult, testName, notes,'\
- 'create_time, build_id, duration) VALUES ' +\
- '(?, ?, ?, ?, ?, ?, ?, ?);'
- else:
- query_str = 'INSERT INTO ice_test_results ("testType", ' +\
- '"testFeature", "testResult", "testName", notes,'\
- 'create_time, build_id, duration) VALUES ' +\
- '(%s, %s, %s, %s, %s, %s, %s, %s);'
- cur.execute(query_str, (testType, testFeature, testResult,
- testName, notes,
- str(datetime.now()),
- settings.ICE_BUILD_REPORT_NUM,
- testDuration))
- dbConn.commit()
- logger.debug("Test result in DB - " + testResult)
- except Exception as e:
- logger.error(e)
- errorMsg = "Failed to insert results to DB." + str(e)
- raise Exception(errorMsg)
- dbConn.close()
-
- @staticmethod
- def select_query(queryStr, return_type="str", fetch_num=1):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- if return_type == "str":
- if fetch_num == 1:
- result = str(cur.fetchone())
- else:
- result = str(cur.fetchall())
- if result != 'None':
- # formatting strings e.g uuid
- if(result.find("',)") != -1):
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- if return_type == "list":
- if fetch_num == 1:
- result = list(cur.fetchone())
- else:
- result = [item[0] for item in cur.fetchall()]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- except BaseException:
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def insert_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Insert query success!")
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def update_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Update query success!")
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def select_where_email(queryColumnName, queryTableName, email):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s WHERE Email = '%s';" % (
- queryColumnName, queryTableName, email)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_email FAILED "
- raise Exception(errorMsg, "select_where_email")
- raise
-
- @staticmethod
- def select_from(queryColumnName, queryTableName, fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = "select %s from %s;" % (queryColumnName, queryTableName)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "select_from FAILED " + str(e)
- raise Exception(errorMsg, "select_from")
-
- @staticmethod
- def select_where(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = list(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_where_order_by_desc(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- order_by):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName,) +\
- "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
- "order by %s desc limit 1;" % order_by
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_dict(queryColumnName, queryTableName, whereParametrType):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- x = ""
- count = 0
- for key, val in whereParametrType.items():
- x += "%s='%s'" % (key, val)
- if len(whereParametrType.items()) - count > 1:
- x += ' and '
- count += 1
- queryStr = "select %s from %s Where %s;" \
- % (queryColumnName, queryTableName, x)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_not_and_order_by_desc(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- order_by):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName) +\
- "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
- "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\
- "order by %s desc limit 1;" % order_by
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_and(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_and FAILED "
- raise Exception(errorMsg, "select_where_and")
-
- @staticmethod
- def select_where_is_bigger(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = cur.fetchall()
- elif (fetchNum == 1):
- result = cur.fetchone()
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_is_bigger FAILED "
- raise Exception(errorMsg, "select_where_is_bigger")
-
- @staticmethod
- def update_where(
- queryTableName,
- setParametrType,
- setparametrValue,
- whereParametrType,
- whereParametrValue):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
- queryTableName, setParametrType, setparametrValue,
- whereParametrType, whereParametrValue)
- cur.execute(queryStr)
- dbConn.commit()
- logger.debug("Query : " + queryStr)
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "Could not Update User"
- logger.debug(e)
- raise Exception(errorMsg, "Update")
- finally:
- dbConn.close()
-
- @staticmethod
- def update_by_query(queryStr):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "Could not Update User"
- raise Exception(errorMsg, "Update")
-
- @staticmethod
- def update_where_and(
- queryTableName,
- setParametrType,
- parametrValue,
- changeToValue,
- whereParametrType,
- setParametrType2,
- setParametrValue2,
- whereParametrType2,
- whereParametrValue2):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE %s SET " % queryTableName +\
- "%s = '%s', " % (setParametrType, changeToValue) +\
- "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\
- "%s = '%s' " % (whereParametrType, parametrValue) +\
- "and %s = '%s';" % (whereParametrType2, whereParametrValue2)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "Could not Update User"
- raise Exception(errorMsg, "Update")
-
- @staticmethod
- def list_format(un_listed):
- un_listed = un_listed[1:-1]
- un_listed = un_listed.replace("',), ('", "|||")
- un_listed = un_listed.replace("(u'", "") # Format list
- un_listed = un_listed[1:-1].replace("('", "") # Format list
- un_listed = un_listed.replace("',)", "") # Format list
- listed = un_listed[1:-2].split("|||")
- return listed
-
- @staticmethod
- def get_vendors_list():
- # Select approved vendors from db.
- vendors_list = DBGeneral.select_where(
- "name", "ice_vendor", "public", "TRUE", 0)
- return vendors_list
diff --git a/services/database/db_user.py b/services/database/db_user.py
deleted file mode 100644
index 10d02ff..0000000
--- a/services/database/db_user.py
+++ /dev/null
@@ -1,515 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import time
-
-from django.conf import settings
-import psycopg2
-
-from services.constants import Constants
-from services.database.db_bridge import DBBridge
-from services.database.db_general import DBGeneral
-from services.database.db_virtual_function import DBVirtualFunction
-from services.frontend.base_actions.wait import Wait
-from services.logging_service import LoggingServiceFactory
-from services.session import session
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBUser:
-
- @staticmethod
- def get_activation_url(email):
- # Fetch one user ID.
- uuid = DBUser.select_user_uuid(email)
- # Fetch one user ID.
- index = DBGeneral.select_where_email("id", "auth_user", email)
- activation_token = DBGeneral.select_where(
- "activation_token", "ice_custom_user", "user_ptr_id", index, 1)
- # / activate /:userID /:token
- activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \
- str(uuid) + '/' + str(activation_token)
- logger.debug("activationUrl :" + activationUrl)
- return activationUrl
-
- @staticmethod
- def get_contact_signup_url(
- invite_token,
- uuid,
- email,
- fullName,
- phoneNum,
- companyName):
- companyId = DBGeneral.select_where(
- "uuid", "ice_vendor", "name", companyName, 1)
- signUpURLforContact = settings.ICE_PORTAL_URL + \
- "#/signUp?invitation=" + invite_token + \
- "&email=" + email + "&full_name=" + fullName + \
- "&phone_number=" + phoneNum + "&company=" + companyId
- logger.debug("SignUpURLforContact :" + signUpURLforContact)
- return signUpURLforContact
-
- @staticmethod
- def select_invitation_token(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- email,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s Where %s = '%s' and email = '%s' ;" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, email)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- if result is None:
- errorMsg = "select_where_pr_state FAILED "
- logger.error(errorMsg)
- raise
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def get_el_name(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadFullName = DBGeneral.select_where_and(
- "full_name", "ice_user_profile", "id", reviewer_id,
- "role_id", "2", 1)
- return engLeadFullName
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "get_el_name FAILED "
- raise Exception(errorMsg, "get_el_name")
-
- @staticmethod
- def get_email_by_full_name(fullname):
- # try:
- query_str = "select email from ice_user_profile where " +\
- "full_name = '%s';" % (fullname)
- user_email = DBGeneral.select_query(query_str)
- return user_email
-
- @staticmethod
- def select_recent_vf_of_user(user_uuid, fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\
- "where user_uuid = '%s' order by last_update " % user_uuid +\
- "desc limit 20;"
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_el_email(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadEmail = DBGeneral.select_where_and(
- "email", "ice_user_profile", "id", reviewer_id, "role_id",
- "2", 1)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_el_email FAILED "
- raise Exception(errorMsg, "select_el_email")
-
- @staticmethod
- def select_user_native_id(email):
- try:
- # Fetch one AT&T user ID.
- engLeadId = DBUser.select_user_profile_property(email, "id")
- return engLeadId
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_native_id FAILED "
- raise Exception(errorMsg, "select_user_native_id")
-
- @staticmethod
- def select_personal_next_step(email):
- user_id = DBUser.select_user_native_id(email)
- return DBGeneral.select_where(
- "uuid", "ice_next_step", "owner_id", user_id, 1)
-
- @staticmethod
- def select_pr_email(vfName):
- try:
- # Fetch one AT&T user ID.
- engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid",
- engagement_id, 1)
- reviewer_id = DBGeneral.select_where(
- "peer_reviewer_id",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- engLeadEmail = DBGeneral.select_where(
- "email", "ice_user_profile", "id", reviewer_id, 1)
- return engLeadEmail
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_el_email FAILED "
- raise Exception(errorMsg, "select_el_email")
-
- @staticmethod
- def get_notification_id_by_email(userEmail):
- uuid = DBGeneral.select_where_email(
- "id", "ice_user_profile", userEmail)
- notifIDs = DBGeneral.select_where(
- "uuid", "ice_notification", "user_id", uuid, 0)
- return notifIDs
-
- @staticmethod
- def get_not_seen_notifications_number_by_email(
- user_email, is_negative=False):
- user_id = DBGeneral.select_where_email(
- "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
- notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT,
- Constants.DBConstants.IceTables.NOTIFICATION,
- "user_id",
- user_id,
- "is_read",
- "False",
- 1)
- if is_negative:
- counter = 0
- while notifications_number != "0" and counter <= Constants.\
- Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
- notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT,
- Constants.DBConstants.IceTables.NOTIFICATION,
- "user_id",
- user_id,
- "is_read",
- "False",
- 1)
- time.sleep(1)
- counter += 1
- return notifications_number
-
- @staticmethod
- def get_eng_lead_email_per_enguuid(enguuid):
- reviewer_id = DBGeneral.select_where(
- "reviewer_id",
- Constants.DBConstants.IceTables.ENGAGEMENT,
- "uuid",
- enguuid,
- 1)
- engLeadEmail = DBGeneral.select_where(
- "email", Constants.DBConstants.IceTables.USER_PROFILE, "id",
- reviewer_id, 1)
- return engLeadEmail
-
- @staticmethod
- def select_all_user_engagements(engLeadID):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\
- " Where iceuserprofile_id = %s" % engLeadID +\
- " and (select " +\
- "engagement_stage from public.ice_engagement " +\
- "where uuid = engagement_id LIMIT 1) != 'Archived';"
-
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = cur.fetchall()
- dbConn.close()
- logger.debug("Query result: " + str(result))
- logger.debug(result[0][0])
- return result[0][0]
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_engagements_by_stage FAILED "
- raise Exception(errorMsg, "select_user_engagements_by_stage")
-
- @staticmethod
- def select_user_engagements_by_stage(stage, engLeadID):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select count(*) from ice_engagement INNER JOIN " +\
- "ice_engagement_engagement_team ON " +\
- "ice_engagement_engagement_team.engagement_id= " +\
- "ice_engagement.uuid Where " +\
- "(ice_engagement.engagement_stage " +\
- "= '%s') and " % stage +\
- "(ice_engagement_engagement_team.iceuserprofile_id = " +\
- "%s );" % engLeadID
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = cur.fetchall()
- dbConn.close()
- logger.debug("Query result: " + str(result))
- logger.debug(result[0][0])
- return result[0][0]
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_user_engagements_by_stage FAILED "
- raise Exception(errorMsg, "select_user_engagements_by_stage")
-
- @staticmethod
- def set_new_temp_password(email):
- encodePass = DBGeneral.select_where_email(
- "password", "auth_user", Constants.Users.Admin.EMAIL)
- # Fetch one user ID.
- index = DBGeneral.select_where_email("id", "auth_user", email)
- DBGeneral.update_where(
- "ice_custom_user",
- "temp_password",
- encodePass,
- "user_ptr_id",
- index)
-
- @staticmethod
- def set_password_to_default(email):
- encodePass = DBGeneral.select_where_email(
- "password", "auth_user", Constants.Users.Admin.EMAIL)
- DBGeneral.update_where(
- "auth_user", "password", encodePass, "email", email)
-
- @staticmethod
- def select_el_not_in_engagement(el_name, pr_name):
- query_str = "select full_name from ice_user_profile where " +\
- "role_id = 2 and full_name != '%s' and full_name != '%s';" % (
- el_name, pr_name)
- new_user = DBGeneral.select_query(query_str)
- if new_user == 'None':
- new_user = DBUser.update_to_el_not_in_engagement()
- return new_user
-
- @staticmethod
- def select_user_uuid(email):
- user_uuid = DBUser.select_user_profile_property(email, "uuid")
- return user_uuid
-
- @staticmethod
- def select_access_key(email):
- access_key = DBUser.select_user_profile_property(
- email, "rgwa_access_key")
- return access_key
-
- @staticmethod
- def select_secret_key(email):
- secret_key = DBUser.select_user_profile_property(
- email, "rgwa_secret_key")
- return secret_key
-
- @staticmethod
- def update_to_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where role_id = 1 ;"
- user_uuid = DBGeneral.select_query(query_str)
- updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\
- " = 'el_for_test' WHERE uuid = '%s' ;" % (
- user_uuid)
- DBGeneral.update_query(updatequery)
- updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
- "full_name = '%s' ;" % (
- 'el_for_test')
- DBGeneral.update_query(updatequery)
- return 'el_for_test'
-
- @staticmethod
- def rollback_for_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where full_name = " +\
- "'el_for_test';"
- user_uuid = DBGeneral.select_query(query_str)
- fullName = DBBridge.helper_rand_string("randomString")
- updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\
- "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid)
- DBGeneral.update_query(updatequery)
-
- @staticmethod
- def set_engagement_peer_reviewer(engagement_uuid, email):
- user_uuid = DBUser.select_user_uuid(email)
- update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
- "uuid = '%s';" % user_uuid
- DBGeneral.update_query(update_query)
-
- user_id = DBGeneral.select_query(
- "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
- update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\
- "WHERE uuid = '%s';" % (
- user_id, engagement_uuid)
- DBGeneral.update_query(update_query)
-
- @staticmethod
- def select_user_profile_property(user_email, property_name):
- return DBGeneral.select_where(
- property_name,
- "ice_user_profile",
- "email",
- user_email,
- 1)
-
- @staticmethod
- def validate_user_profile_settings_in_db(user_email, checked):
- Wait.page_has_loaded()
- regular_email_updates = DBUser.select_user_profile_property(
- user_email, 'regular_email_updates')
- DBBridge.helper_internal_assert(regular_email_updates, checked)
- email_updates_on_every_notification = \
- DBUser.select_user_profile_property(
- user_email, 'email_updates_on_every_notification')
- DBBridge.helper_internal_assert(
- email_updates_on_every_notification, checked)
- email_updates_daily_digest = DBUser.select_user_profile_property(
- user_email, 'email_updates_daily_digest')
- DBBridge.helper_internal_assert(
- email_updates_daily_digest, not checked)
-
- @staticmethod
- def retrieve_admin_ssh_from_db():
- ssh_key = DBGeneral.select_where(
- 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE,
- 'email', Constants.Users.Admin.EMAIL, 1)
- return ssh_key
-
- @staticmethod
- def get_access_key(user_uuid):
- counter = 0
- access_key = DBGeneral.select_where(
- "rgwa_access_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- while access_key == "None" and counter <= \
- Constants.RGWAConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause)
- logger.debug(
- "rgwa_access_key are not ready yet, trying again (%s of 20)" %
- counter)
- access_key = DBGeneral.select_where(
- "rgwa_access_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- counter += 1
- return access_key
-
- @staticmethod
- def get_access_secret(user_uuid):
- counter = 0
- access_secret = DBGeneral.select_where(
- "rgwa_secret_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
- while access_secret == "None" and counter <= Constants.\
- RGWAConstants.RETRIES_NUMBER:
- time.sleep(session.wait_until_time_pause)
- logger.debug(
- "rgwa_secret_key are not ready yet, trying again (%s of 100)" %
- counter)
- access_secret = DBGeneral.select_where(
- "rgwa_secret_key",
- Constants.DBConstants.IceTables.USER_PROFILE,
- "uuid",
- user_uuid,
- 1)
-
- counter += 1
- return access_secret
diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py
deleted file mode 100644
index f61d1b7..0000000
--- a/services/database/db_virtual_function.py
+++ /dev/null
@@ -1,286 +0,0 @@
-
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-import uuid
-
-from django.conf import settings
-import psycopg2
-
-from services.constants import Constants
-from services.database.db_bridge import DBBridge
-from services.database.db_general import DBGeneral
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBVirtualFunction:
-
- @staticmethod
- def insert_ecomp_release(uuid, name, ui_visibility="TRUE"):
- try:
- queryTableName = "ice_ecomp_release"
- # Connect to General 'default'.
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection("em_db"))
- dbConn = dbConn
- cur = dbConn.cursor()
- logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
- except Exception as e:
- errorMsg = "Failed to create connection to General: " + str(e)
- raise Exception(errorMsg)
- try:
- logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
- # Create INSERT query.
- queryStr = "INSERT INTO %s " % queryTableName +\
- "(""uuid, name, weight, ui_visibility"") VALUES " +\
- "('%s', '%s', " % (uuid, name) +\
- "'%s', '%s');" % (0, ui_visibility)
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr) # Execute query.
- dbConn.commit()
- logger.debug("Test results are in General now.")
- except Exception as e:
- errorMsg = "Failed to insert ECOMP release to General:" + str(e)
- raise Exception(errorMsg)
- dbConn.close()
-
- @staticmethod
- def delete_ecomp_release(uuid, name):
- try:
- queryTableName = "ice_ecomp_release"
- # Connect to General 'default'.
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection("em_db"))
- dbConn = dbConn
- cur = dbConn.cursor()
- except Exception as e:
- errorMsg = "Failed to create connection to DBGeneral.because :" + \
- str(e)
- raise Exception(errorMsg)
- try:
- # Create INSERT query.
- queryStr = "DELETE FROM %s WHERE uuid = '%s';" % (
- queryTableName, uuid)
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr) # Execute query.
- dbConn.commit()
- logger.debug("Test results are in General now.")
- except Exception as e:
- errorMsg = "Failed to delete ECOMP release from General ." +\
- " because :" + \
- str(e)
- raise Exception(errorMsg)
- raise
- dbConn.close()
-
- @staticmethod
- def select_next_steps_ids(engagement_uuid):
- ice_next_steps = DBGeneral.select_where(
- "uuid", "ice_next_step", "engagement_id", engagement_uuid, 0)
- return ice_next_steps
-
- @staticmethod
- def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage):
- query = "SELECT uuid FROM %s WHERE " % (
- Constants.DBConstants.IceTables.NEXT_STEP) + "engagement_id=" +\
- "'%s' AND engagement_stage='%s' ORDER BY position;" % (
- engagement_uuid, engagement_stage)
- return DBGeneral.select_query(query, "list", 0)
-
- @staticmethod
- def update_next_step_position(next_step_uuid, new_index):
- DBGeneral.update_where(
- "ice_next_step", "position", new_index, "uuid", next_step_uuid)
-
- @staticmethod
- def select_next_step_description(next_step_uuid):
- return DBGeneral.select_where(
- "description",
- "ice_next_step",
- "uuid",
- next_step_uuid,
- 1)
-
- @staticmethod
- def select_eng_uuid(vf_name):
- return DBGeneral.select_where(
- "engagement_id", "ice_vf", "name", vf_name, 1)
-
- @staticmethod
- def select_engagment_uuid_by_vf_name(vfName):
- engagement_id = DBGeneral.select_where(
- "engagement_id", "ice_vf", "name", vfName, 1)
- engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
- enguuid = DBGeneral.select_where(
- "uuid",
- "ice_engagement",
- "engagement_manual_id",
- engagement_manual_id,
- 1)
- return enguuid
-
- @staticmethod
- def select_vf_version_by_vf_name(vfName):
- queryStr = "SELECT version FROM ice_vf WHERE name= '%s';" % vfName
- version_name = str(DBGeneral.select_query(queryStr))
- return version_name
-
- @staticmethod
- def select_vf_name_by_vf_version(version_name):
- queryofname = "SELECT name FROM ice_vf WHERE " +\
- "version= '%s';" % version_name
- vfNameDb = str(DBGeneral.select_query(queryofname))
- return vfNameDb
-
- @staticmethod
- def return_expected_steps(engagement_uuid, stage, user_email):
- steps_uuids = DBVirtualFunction.select_next_steps_uuids_by_stage(
- engagement_uuid, stage)
- personal_step_uuid = DBBridge.select_personal_next_step(user_email)
- DBVirtualFunction.update_next_step_position(personal_step_uuid, 1)
- steps_uuids.insert(0, personal_step_uuid)
- return steps_uuids
-
- @staticmethod
- def get_engagement():
- """Use this function instead of creating a new """ +\
- """engagement where no need to"""
- queryStr = "SELECT DISTINCT ice_engagement.uuid, " +\
- "engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
- ice_user_profile.email, reviewer_table.full_name, " +\
- "reviewer_table.email, \
- ice_deployment_target.version, ice_ecomp_release.name \
- FROM ice_engagement LEFT JOIN ice_vf ON engagement_id " +\
- "= ice_engagement.uuid \
- LEFT JOIN ice_user_profile reviewer_table ON " +\
- "reviewer_table.id = ice_engagement.reviewer_id \
- LEFT JOIN ice_user_profile ON ice_user_profile.id = " +\
- "ice_engagement.peer_reviewer_id \
- LEFT JOIN ice_deployment_target ON " +\
- "ice_deployment_target.uuid = " +\
- "ice_vf.deployment_target_id \
- LEFT JOIN ice_ecomp_release ON " +\
- "ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
- WHERE ice_user_profile.id IS NOT NULL LIMIT 1;"
- list_of_values = DBGeneral.select_query(queryStr, return_type="list")
- list_of_keys = [
- "engagement_uuid",
- "engagement_manual_id",
- "vfName",
- "pr_name",
- "pr_email",
- "el_name",
- "el_email",
- "target_aic",
- "ecomp_release"]
- return dict(zip(list_of_keys, list_of_values))
-
- @staticmethod
- def insert_aic_version(ui_visibility="TRUE"):
- new_aic_version = {
- "uuid": str(
- uuid.uuid4()),
- "name": "AIC",
- "version": DBBridge.helper_rand_string(
- "randomNumber",
- 2),
- "ui_visibility": ui_visibility,
- "weight": 0}
- queryStr = "INSERT INTO public.ice_deployment_target( \
- uuid, name, version, ui_visibility, weight) \
- VALUES " +\
- "('%s', '%s', '%s', '%s', %s);" % (
- new_aic_version['uuid'],
- new_aic_version['name'],
- new_aic_version['version'],
- new_aic_version['ui_visibility'],
- new_aic_version['weight'])
- DBGeneral.insert_query(queryStr)
- return new_aic_version
-
- @staticmethod
- def delete_aic_version(aic_uuid):
- DBGeneral.insert_query(
- "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" %
- aic_uuid)
-
- @staticmethod
- def change_aic_version_weight(new_weight, old_weight):
- DBGeneral.insert_query(
- "UPDATE public.ice_deployment_target " +
- "SET weight=%s " % new_weight +
- "WHERE weight=%s" % old_weight)
-
- @staticmethod
- def change_ecomp_release_weight(new_weight, old_weight):
- DBGeneral.insert_query(
- "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" %
- (new_weight, old_weight))
-
- @staticmethod
- def select_aic_version_uuid(aic_version):
- return DBGeneral.select_where(
- "uuid", "ice_deployment_target", "version", aic_version, 1)
-
- @staticmethod
- def select_ecomp_release_uuid(ecomp_release):
- return DBGeneral.select_where(
- "uuid", "ice_ecomp_release", "name", ecomp_release, 1)
-
- @staticmethod
- def add_admin_to_eng_team(eng_uuid):
- admin_db_id = DBGeneral.select_where(
- 'id',
- Constants.DBConstants.IceTables.USER_PROFILE,
- 'email',
- Constants.Users.Admin.EMAIL,
- 1)
- queryStr = "INSERT INTO public.ice_engagement_engagement_team" +\
- "(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
- eng_uuid, admin_db_id)
- logger.debug("add_admin_to_eng_team Query: %s" % queryStr)
- DBGeneral.insert_query(queryStr)
-
- @staticmethod
- def remove_engagement_from_recent(vf_uuid):
- DBGeneral.insert_query(
- "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.
- IceTables.RECENT, vf_uuid))