aboutsummaryrefslogtreecommitdiffstats
path: root/services/database
diff options
context:
space:
mode:
Diffstat (limited to 'services/database')
-rw-r--r--services/database/__init__.py4
-rw-r--r--services/database/db_bridge.py15
-rw-r--r--services/database/db_checklist.py270
-rw-r--r--services/database/db_cms.py75
-rwxr-xr-xservices/database/db_general.py145
-rw-r--r--services/database/db_user.py238
-rw-r--r--services/database/db_virtual_function.py123
7 files changed, 625 insertions, 245 deletions
diff --git a/services/database/__init__.py b/services/database/__init__.py
index 30d7152..32b601a 100644
--- a/services/database/__init__.py
+++ b/services/database/__init__.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py
index fc765c7..1eb79fa 100644
--- a/services/database/db_bridge.py
+++ b/services/database/db_bridge.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -36,16 +36,21 @@
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+
class DBBridge:
"""
- This class helps to use functions inside classes with circular import (dependencies).
- Use this class only when there is circular import in one of the DB services.
+ This class helps to use functions inside classes
+ with circular import (dependencies).
+ Use this class only when there is circular
+ import in one of the DB services.
"""
@staticmethod
def select_personal_next_step(user_email):
- """select_personal_next_step: Originally can be found under DBUser class."""
+ """select_personal_next_step: Originally """ +\
+ """can be found under DBUser class."""
from services.database.db_user import DBUser
return DBUser.select_personal_next_step(user_email)
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
index 04f8a44..0f8fd6e 100644
--- a/services/database/db_checklist.py
+++ b/services/database/db_checklist.py
@@ -54,14 +54,21 @@ logger = LoggingServiceFactory.get_logger()
class DBChecklist:
@staticmethod
- def select_where_approval_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_approval_state(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state = 'approval';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
+ " and state = 'approval';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -74,25 +81,33 @@ class DBChecklist:
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
logger.debug("Query result: " + str(result))
- if result == None:
+ if result is None:
errorMsg = "select_where_approval_state FAILED "
logger.error(errorMsg)
raise
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_approval_state FAILED "
raise Exception(errorMsg, "select_where_approval_state FAILED")
@staticmethod
- def select_where_pr_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_pr_state(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state = 'peer_review';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s' and " % (
+ whereParametrType, whereParametrValue) +\
+ "state = 'peer_review';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -104,26 +119,33 @@ class DBChecklist:
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
- if result == None:
+ if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_where_cl_not_archive(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where_cl_not_archive(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and state != 'archive';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
+ "and state != 'archive';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -138,19 +160,25 @@ class DBChecklist:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_native_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_native_where(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -165,7 +193,7 @@ class DBChecklist:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -176,14 +204,15 @@ class DBChecklist:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist SET state='review' Where name= '%s' and state= 'pending';" % (
- queryTableName)
+ queryStr = "UPDATE ice_checklist SET state='review' Where " +\
+ "name= '%s' and state= 'pending';" % (
+ queryTableName)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
@@ -194,8 +223,10 @@ class DBChecklist:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE ice_checklist_decision SET review_value='approved' , peer_review_value='approved' Where checklist_id = '%s';" % (
- whereParametrValue)
+ queryStr = "UPDATE ice_checklist_decision SET " +\
+ "review_value='approved' , peer_review_value='approved' " +\
+ "Where checklist_id = '%s';" % (
+ whereParametrValue)
logger.debug(queryStr)
cur.execute(queryStr)
dbConn.commit()
@@ -226,7 +257,7 @@ class DBChecklist:
break
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "is_archive FAILED "
raise Exception(errorMsg, "is_archive")
@@ -248,14 +279,15 @@ class DBChecklist:
@staticmethod
def get_admin_email(checklistUuid):
try:
+ # Fetch one AT&T user ID.
owner_id = DBChecklist.select_where_approval_state(
- "owner_id", "ice_checklist", "uuid", checklistUuid, 1) # Fetch one AT&T user ID.
+ "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", owner_id, 1)
logger.debug("get_admin_email = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_admin_email")
@@ -270,7 +302,7 @@ class DBChecklist:
logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_owner_email")
@@ -279,83 +311,181 @@ class DBChecklist:
checklistTempid = DBGeneral.select_where(
"template_id", "ice_checklist", "name", checklistName, 1)
checklistLineItems = DBGeneral.select_where_and(
- "uuid", "ice_checklist_line_item", "line_type", "auto", "template_id", checklistTempid, 0)
+ "uuid",
+ "ice_checklist_line_item",
+ "line_type",
+ "auto",
+ "template_id",
+ checklistTempid,
+ 0)
for lineItem in checklistLineItems:
setParametrType2 = "peer_review_value"
setParametrValue2 = "approved"
whereParametrType2 = "lineitem_id"
whereParametrValue2 = lineItem
- DBGeneral.update_where_and("ice_checklist_decision", "review_value", checklistUuid, "approved",
- "checklist_id", setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2)
+ DBGeneral.update_where_and(
+ "ice_checklist_decision",
+ "review_value",
+ checklistUuid,
+ "approved",
+ "checklist_id",
+ setParametrType2,
+ setParametrValue2,
+ whereParametrType2,
+ whereParametrValue2)
@staticmethod
def checkChecklistIsUpdated():
- query = "select uuid from ice_checklist_section where template_id in (select template_id from ice_checklist_template where name='{template_name}') and name='{section_name}'".format(
- template_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, section_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
+ query = "select uuid from ice_checklist_section where template_id " +\
+ "in (select template_id from ice_checklist_template where " +\
+ "name='{template_name}') and name='{section_name}'".format(
+ template_name=Constants.Dashboard.LeftPanel.
+ EditChecklistTemplate.HEAT, section_name=Constants.
+ Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
return DBGeneral.select_query(query)
@staticmethod
def fetchEngByVfName(vfName):
# Fetch one AT&T user ID.
- return DBGeneral.select_where("engagement_id", "ice_vf", "name", vfName, 1)
+ return DBGeneral.select_where(
+ "engagement_id", "ice_vf", "name", vfName, 1)
@staticmethod
def fetchEngManIdByEngUuid(engagement_id):
- return DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ return DBGeneral.select_where(
+ "engagement_manual_id",
+ "ice_engagement",
+ "uuid",
+ engagement_id,
+ 1)
@staticmethod
def fetchChecklistByName(checklistName):
- query = "select uuid from ice_checklist where name='{cl_name}'".format(
- cl_name=checklistName)
+ query = "select uuid from ice_checklist where " +\
+ "name='{cl_name}'".format(
+ cl_name=checklistName)
return DBGeneral.select_query(query)
@staticmethod
def create_default_heat_teampleate():
- template_query = "INSERT INTO public.ice_checklist_template(uuid, name, category, version, create_time)"\
+ template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\
+ "name, category, version, create_time)"\
"VALUES ('%s', '%s', '%s', '%s', '%s');" % (
- str(uuid4()), 'Editing Heat', 'first category', '1', timezone.now())
+ str(uuid4()), 'Editing Heat', 'first category', '1',
+ timezone.now())
DBGeneral.insert_query(template_query)
template_id = DBGeneral.select_query(
- "SELECT uuid FROM public.ice_checklist_template where name = 'Editing Heat'")
+ "SELECT uuid FROM public.ice_checklist_template where " +
+ "name = 'Editing Heat'")
# SECTIONS
- section1_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'External References',
- '1', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
+ "name, weight, description, validation_instructions, " +\
+ "create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
+ str(uuid4()), 'External References',
+ '1', 'section descripyion', 'valid instructions',
+ timezone.now(), template_id)
DBGeneral.insert_query(section1_query)
section1_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section where name = 'External References' and template_id = '{s}'""").format(s=template_id))
- section2_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'Parameter Specification',
- '2', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ ("""SELECT uuid FROM public.ice_checklist_section """ +
+ """where name = 'External References' """ +
+ """and template_id = '{s}'""").format(
+ s=template_id))
+ section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
+ "name, weight, description, validation_instructions, " +\
+ "create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
+ str(uuid4()), 'Parameter Specification',
+ '2', 'section descripyion', 'valid instructions',
+ timezone.now(), template_id)
DBGeneral.insert_query(section2_query)
section2_id = DBGeneral.select_query(
- ("""SELECT uuid FROM public.ice_checklist_section where name = 'Parameter Specification' and template_id = '{s}'""").format(s=template_id))
+ ("""SELECT uuid FROM public.ice_checklist_section """ +
+ """where name = """ +
+ """'Parameter Specification' and template_id = '{s}'""").format(
+ s=template_id))
# Line items
- line_item1 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Normal references', '1', 'Numeric parameters should include range and/or allowed values.', 'manual',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ line_item1 = \
+ "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\
+ "'%s', " % '1' +\
+ "'%s'," % 'Numeric parameters should include ' +\
+ 'range and/or allowed values.' +\
+ " '%s'," % 'manual', +\
+ "'%s'" % 'Here are some useful tips ' +\
+ 'for how to validate this item ' +\
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +\
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +\
+ 'Here is my awesome tip 3</li></ul>' +\
+ ", '%s'" % timezone.now() +\
+ ", '%s'," % section1_id +\
+ " '%s');" % template_id
DBGeneral.insert_query(line_item1)
- line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'String parameters', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time, section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (
+ str(uuid4()), 'String parameters', '2',
+ 'Numeric parameters should include range ' +
+ 'and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
+ 'Here is my awesome tip 3</li></ul>', timezone.now(),
+ section2_id, template_id)
DBGeneral.insert_query(line_item2)
- line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Numeric parameters', '3', 'Numeric parameters should include range and/or allowed values.', 'manual',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', " +\
+ "'%s', '%s', '%s');" % (
+ str(uuid4()), 'Numeric parameters', '3',
+ 'Numeric parameters should include range and/or ' +
+ 'allowed values.', 'manual',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my ' +
+ 'awesome tip 1</li><li>Here is my awesome tip 2</li><li>' +
+ 'Here is my awesome tip 3</li></ul>', timezone.now(),
+ section2_id, template_id)
DBGeneral.insert_query(line_item3)
- line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'VF image', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, " +\
+ "validation_instructions,create_time, section_id, " +\
+ "template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', " +\
+ "'%s', '%s');" % (
+ str(uuid4()), 'VF image', '2',
+ 'Numeric parameters should include range and/or ' +
+ 'allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this ' +
+ 'item in the most awesome way:<br><br><ul><li>Here is ' +
+ 'my awesome tip 1</li><li>Here is my awesome tip 2' +
+ '</li><li>Here is my awesome tip 3</li></ul>',
+ timezone.now(), section1_id, template_id)
DBGeneral.insert_query(line_item4)
- line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
- "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Parameters', '1', 'Numeric parameters should include range and/or allowed values.', 'auto',
- 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, " +\
+ "name, weight, description, line_type, validation_instructions," +\
+ "create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s'," +\
+ " '%s', '%s');" % (str(
+ uuid4()), 'Parameters', '1',
+ 'Numeric parameters should include range ' +
+ 'and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item ' +
+ 'in the most awesome way:<br><br><ul><li>Here is my awesome ' +
+ 'tip 1</li><li>Here is my awesome tip 2</li><li>Here is my ' +
+ 'awesome tip 3</li></ul>', timezone.now(), section2_id,
+ template_id)
DBGeneral.insert_query(line_item5)
@staticmethod
def create_editing_cl_template_if_not_exist():
- template_id = DBGeneral.select_query(("""SELECT uuid FROM public.ice_checklist_template where name = '{s}'""").format(
- s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
+ template_id = DBGeneral.select_query(
+ ("""SELECT uuid FROM public.ice_checklist_template """ +
+ """where name = '{s}'""").format(
+ s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
if template_id == 'None':
DBChecklist.create_default_heat_teampleate()
session.createTemplatecount = True
@@ -366,10 +496,14 @@ class DBChecklist:
"state", Constants.DBConstants.IceTables.CHECKLIST,
identify_field, field_value, "create_time")[0]
counter = 0
- while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER:
+ while get_state != expected_state and \
+ counter <= Constants.DBConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause_long)
- logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % (
- expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER))
+ logger.debug(
+ "Checklist state not changed yet ," +
+ "expecting state: %s, current result: %s (attempt %s of %s)" %
+ (expected_state, get_state, counter,
+ Constants.DBConstants.RETRIES_NUMBER))
counter += 1
get_state = DBGeneral.select_where_order_by_desc(
"state", Constants.DBConstants.IceTables.CHECKLIST,
@@ -380,7 +514,9 @@ class DBChecklist:
expected_state + ", and was verified over the DB")
return expected_state
raise Exception(
- "Expected checklist state never arrived " + expected_state, get_state)
+ "Expected checklist state never arrived " +
+ expected_state,
+ get_state)
@staticmethod
def get_recent_checklist_uuid(name):
diff --git a/services/database/db_cms.py b/services/database/db_cms.py
index 3c2b2c6..288121a 100644
--- a/services/database/db_cms.py
+++ b/services/database/db_cms.py
@@ -1,5 +1,4 @@
-
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -39,17 +38,9 @@
import psycopg2
from wheel.signatures import assertTrue
-from services.constants import Constants
from services.database.db_general import DBGeneral
-from services.frontend.base_actions.click import Click
-from services.frontend.base_actions.enter import Enter
-from services.frontend.base_actions.wait import Wait
-from services.frontend.fe_dashboard import FEDashboard
-from services.frontend.fe_general import FEGeneral
-from services.frontend.fe_user import FEUser
from services.helper import Helper
from services.logging_service import LoggingServiceFactory
-from services.session import session
logger = LoggingServiceFactory.get_logger()
@@ -70,7 +61,7 @@ class DBCMS:
dbConn.close()
logger.debug("Insert query success!")
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -86,7 +77,7 @@ class DBCMS:
dbConn.close()
logger.debug("Update query success!")
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -107,14 +98,14 @@ class DBCMS:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
def get_cms_category_id(categoryName):
logger.debug("Get DBCMS category id for name: " + categoryName)
- queryStr = "SELECT id FROM public.blog_blogcategory WHERE title = '%s' LIMIT 1;" % (
- categoryName)
+ queryStr = "SELECT id FROM public.blog_blogcategory WHERE " +\
+ "title = '%s' LIMIT 1;" % (categoryName)
logger.debug("Query : " + queryStr)
result = DBCMS.select_query(queryStr)
return result
@@ -123,9 +114,17 @@ class DBCMS:
def insert_cms_new_post(title, description, categoryName):
logger.debug("Insert new post : " + title)
queryStr = "INSERT INTO public.blog_blogpost" \
- "(comments_count, keywords_string, rating_count, rating_sum, rating_average, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, content, allow_comments, featured_image, site_id, user_id) "\
- "VALUES (0, '', 0, 0, 0, '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '2 day', 2, current_timestamp - interval '1 day', NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % (
- title, title, description, description)
+ "(comments_count, keywords_string, rating_count, rating_sum, " +\
+ "rating_average, title, slug, _meta_title, description, " +\
+ "gen_description, created, updated, status, publish_date, " +\
+ "expiry_date, short_url, in_sitemap, content, allow_comments, " +\
+ "featured_image, site_id, user_id) "\
+ "VALUES (0, '', 0, 0, 0, " +\
+ "'%s', '%s-slug', " % (title, title) +\
+ "'', '%s', true, " % description +\
+ "current_timestamp - interval '1 day', current_timestamp - " +\
+ "interval '2 day', 2, current_timestamp - interval '1 day', " +\
+ "NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % description
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
post_id = DBCMS.get_last_added_post_id()
@@ -144,9 +143,9 @@ class DBCMS:
@staticmethod
def update_days(xdays, title):
logger.debug("Get the id of the post inserted")
-# queryStr = "select MAX(id) FROM public.blog_blogpost;"
- queryStr = "UPDATE public.blog_blogpost SET created=current_timestamp - interval '%s day' WHERE title='%s';" % (
- xdays, title)
+ queryStr = "UPDATE public.blog_blogpost SET " +\
+ "created=current_timestamp - interval '%s day' " % xdays +\
+ "WHERE title='%s';" % title
logger.debug("Query : " + queryStr)
result = DBCMS.update_query(queryStr)
return result
@@ -154,15 +153,17 @@ class DBCMS:
@staticmethod
def add_category_to_post(postId, categoryId):
logger.debug("bind category into inserted post: " + postId)
- queryStr = "INSERT INTO public.blog_blogpost_categories(blogpost_id, blogcategory_id) VALUES (%s, %s);" % (
- postId, categoryId)
+ queryStr = "INSERT INTO public.blog_blogpost_categories" +\
+ "(blogpost_id, blogcategory_id) " +\
+ "VALUES (%s, %s);" % (postId, categoryId)
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
@staticmethod
def get_documentation_page_id():
logger.debug("Retrive id of documentation page: ")
- queryStr = "SELECT id FROM public.pages_page WHERE title = 'Documentation' LIMIT 1;"
+ queryStr = "SELECT id FROM public.pages_page WHERE " +\
+ "title = 'Documentation' LIMIT 1;"
logger.debug("Query : " + queryStr)
result = DBCMS.select_query(queryStr)
return result
@@ -191,17 +192,27 @@ class DBCMS:
if parent_id is None:
parent_id = DBCMS.get_documentation_page_id()
queryStr = "INSERT INTO public.pages_page(" \
- "keywords_string, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, _order, in_menus, titles, content_model, login_required, parent_id, site_id)" \
- "VALUES ('', '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '1 day', 2, current_timestamp - interval '1 day', NULL, '', true, 0, '1,2,3', '%s', 'richtextpage', true, %s, 1);" % (
- title, title, content, title, parent_id)
+ "keywords_string, title, slug, _meta_title, description, " +\
+ "gen_description, created, updated, status, publish_date, " +\
+ "expiry_date, short_url, in_sitemap, _order, in_menus, titles, " +\
+ "content_model, login_required, parent_id, site_id)" \
+ "VALUES ('', " +\
+ "'%s', '%s-slug'" % (title, title) +\
+ ", '', '%s', true, " % content +\
+ "current_timestamp - interval '1 day', current_timestamp " +\
+ "- interval '1 day', 2, current_timestamp - interval '1 day', " +\
+ "NULL, '', true, 0, '1,2,3', " +\
+ "'%s', 'richtextpage', " % title +\
+ "true, %s, 1);" % parent_id
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
createdPageId = DBCMS.get_last_inserted_page_id()
logger.debug(
"Bind the page with the rich text content related to this page")
- queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, content) VALUES (%s, '<p>%s</p>');" % (
- createdPageId, content)
+ queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, " +\
+ "content) VALUES (%s, '<p>%s</p>');" % (
+ createdPageId, content)
logger.debug("Query : " + queryStr)
DBCMS.insert_query(queryStr)
return createdPageId
@@ -244,8 +255,10 @@ class DBCMS:
@staticmethod
def update_X_days_back_post(title, xdays):
logger.debug("Get the id of the post inserted")
- queryStr = "UPDATE blog_blogpost SET created = current_timestamp - interval '%s day', publish_date=current_timestamp - interval '%s day' WHERE title= '%s' ;" % (
- xdays, xdays, title)
+ queryStr = "UPDATE blog_blogpost SET created = current_timestamp" +\
+ " - interval '%s day', " % xdays +\
+ "publish_date=current_timestamp - " +\
+ "interval '%s day' WHERE title= '%s' ;" % (xdays, title)
logger.debug("Query : " + queryStr)
DBCMS.update_query(queryStr)
diff --git a/services/database/db_general.py b/services/database/db_general.py
index c850d3a..2c83fb0 100755
--- a/services/database/db_general.py
+++ b/services/database/db_general.py
@@ -1,5 +1,4 @@
-
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -48,22 +47,31 @@ from services.logging_service import LoggingServiceFactory
logger = LoggingServiceFactory.get_logger()
+
class DBGeneral:
@staticmethod
# desigredDB: Use 'default' for CI General and 'em_db' for EM General
# (according to settings.DATABASES).
def return_db_native_connection(desigredDB):
- dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \
+ dbConnectionStr = "dbname='" + str(
+ settings.SINGLETONE_DB[desigredDB]['NAME']) + \
"' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
"' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
- "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
+ "' password='" + str(
+ settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
"' port='" + \
str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
return dbConnectionStr
@staticmethod
- def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "):
+ def insert_results(
+ testType,
+ testFeature,
+ testResult,
+ testName,
+ testDuration,
+ notes=" "):
try:
if settings.DATABASE_TYPE == 'sqlite':
dbfile = str(settings.DATABASES['default']['TEST_NAME'])
@@ -80,13 +88,20 @@ class DBGeneral:
raise Exception(errorMsg)
try: # Create INSERT query.
if settings.DATABASE_TYPE == 'sqlite':
- query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\
- 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);'
+ query_str = 'INSERT INTO ice_test_results ' +\
+ '(testType, testFeature, testResult, testName, notes,'\
+ 'create_time, build_id, duration) VALUES ' +\
+ '(?, ?, ?, ?, ?, ?, ?, ?);'
else:
- query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\
- 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);'
- cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()),
- settings.ICE_BUILD_REPORT_NUM, testDuration))
+ query_str = 'INSERT INTO ice_test_results ("testType", ' +\
+ '"testFeature", "testResult", "testName", notes,'\
+ 'create_time, build_id, duration) VALUES ' +\
+ '(%s, %s, %s, %s, %s, %s, %s, %s);'
+ cur.execute(query_str, (testType, testFeature, testResult,
+ testName, notes,
+ str(datetime.now()),
+ settings.ICE_BUILD_REPORT_NUM,
+ testDuration))
dbConn.commit()
logger.debug("Test result in DB - " + testResult)
except Exception as e:
@@ -123,7 +138,7 @@ class DBGeneral:
dbConn.close()
logger.debug("Query result: " + str(result))
return result
- except:
+ except BaseException:
raise Exception("Couldn't fetch answer using the given query.")
@staticmethod
@@ -182,7 +197,7 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_email FAILED "
raise Exception(errorMsg, "select_where_email")
raise
@@ -213,13 +228,19 @@ class DBGeneral:
raise Exception(errorMsg, "select_from")
@staticmethod
- def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ def select_where(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -234,17 +255,24 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
- def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by):
+ def select_where_order_by_desc(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ order_by):
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \
- % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName,) +\
+ "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
+ "order by %s desc limit 1;" % order_by
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = str(cur.fetchall())
@@ -274,14 +302,22 @@ class DBGeneral:
return result
@staticmethod
- def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd, order_by):
+ def select_where_not_and_order_by_desc(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ order_by):
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \
- % (queryColumnName, queryTableName, whereParametrType, whereParametrValue,
- parametrTypeAnd, parametrAnd, order_by)
+ queryStr = \
+ "select %s from %s " % (queryColumnName, queryTableName) +\
+ "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
+ "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\
+ "order by %s desc limit 1;" % order_by
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = str(cur.fetchall())
@@ -290,15 +326,22 @@ class DBGeneral:
return result
@staticmethod
- def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue,
- parametrTypeAnd, parametrAnd, fetchNum):
+ def select_where_and(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -314,19 +357,27 @@ class DBGeneral:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_and FAILED "
raise Exception(errorMsg, "select_where_and")
@staticmethod
- def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum):
+ def select_where_is_bigger(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ parametrTypeAnd,
+ parametrAnd,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -340,19 +391,25 @@ class DBGeneral:
dbConn.close()
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where_is_bigger FAILED "
raise Exception(errorMsg, "select_where_is_bigger")
@staticmethod
- def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue):
+ def update_where(
+ queryTableName,
+ setParametrType,
+ setparametrValue,
+ whereParametrType,
+ whereParametrValue):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
- queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue)
+ queryTableName, setParametrType, setparametrValue,
+ whereParametrType, whereParametrValue)
cur.execute(queryStr)
dbConn.commit()
logger.debug("Query : " + queryStr)
@@ -376,25 +433,37 @@ class DBGeneral:
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
@staticmethod
- def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2):
+ def update_where_and(
+ queryTableName,
+ setParametrType,
+ parametrValue,
+ changeToValue,
+ whereParametrType,
+ setParametrType2,
+ setParametrValue2,
+ whereParametrType2,
+ whereParametrValue2):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % (
- queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2)
+ queryStr = "UPDATE %s SET " % queryTableName +\
+ "%s = '%s', " % (setParametrType, changeToValue) +\
+ "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\
+ "%s = '%s' " % (whereParametrType, parametrValue) +\
+ "and %s = '%s';" % (whereParametrType2, whereParametrValue2)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
diff --git a/services/database/db_user.py b/services/database/db_user.py
index d347dd2..10d02ff 100644
--- a/services/database/db_user.py
+++ b/services/database/db_user.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -52,6 +52,7 @@ from services.session import session
logger = LoggingServiceFactory.get_logger()
+
class DBUser:
@staticmethod
@@ -69,24 +70,39 @@ class DBUser:
return activationUrl
@staticmethod
- def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName):
+ def get_contact_signup_url(
+ invite_token,
+ uuid,
+ email,
+ fullName,
+ phoneNum,
+ companyName):
companyId = DBGeneral.select_where(
"uuid", "ice_vendor", "name", companyName, 1)
- signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \
+ signUpURLforContact = settings.ICE_PORTAL_URL + \
+ "#/signUp?invitation=" + invite_token + \
"&email=" + email + "&full_name=" + fullName + \
"&phone_number=" + phoneNum + "&company=" + companyId
logger.debug("SignUpURLforContact :" + signUpURLforContact)
return signUpURLforContact
@staticmethod
- def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum):
+ def select_invitation_token(
+ queryColumnName,
+ queryTableName,
+ whereParametrType,
+ whereParametrValue,
+ email,
+ fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % (
- queryColumnName, queryTableName, whereParametrType, whereParametrValue, email)
+ queryStr = \
+ "select %s from %s Where %s = '%s' and email = '%s' ;" % (
+ queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, email)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -98,14 +114,14 @@ class DBUser:
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
- if result == None:
+ if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -115,27 +131,30 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadFullName = DBGeneral.select_where_and(
- "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "full_name", "ice_user_profile", "id", reviewer_id,
+ "role_id", "2", 1)
return engLeadFullName
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "get_el_name FAILED "
raise Exception(errorMsg, "get_el_name")
@staticmethod
def get_email_by_full_name(fullname):
# try:
- query_str = "select email from ice_user_profile where full_name = '%s';" % (
- fullname)
+ query_str = "select email from ice_user_profile where " +\
+ "full_name = '%s';" % (fullname)
user_email = DBGeneral.select_query(query_str)
return user_email
-# except: # If failed - count the failure and add the error to list of errors.
-# errorMsg = "get_email_by_full_name FAILED "
-# raise Exception(errorMsg, "get_el_name")
@staticmethod
def select_recent_vf_of_user(user_uuid, fetchNum):
@@ -144,8 +163,9 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % (
- user_uuid)
+ queryStr = "SELECT vf_id FROM public.ice_recent_engagement " +\
+ "where user_uuid = '%s' order by last_update " % user_uuid +\
+ "desc limit 20;"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
@@ -160,7 +180,7 @@ class DBUser:
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@@ -170,14 +190,20 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where_and(
- "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ "email", "ice_user_profile", "id", reviewer_id, "role_id",
+ "2", 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -188,14 +214,15 @@ class DBUser:
engLeadId = DBUser.select_user_profile_property(email, "id")
return engLeadId
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_native_id FAILED "
raise Exception(errorMsg, "select_user_native_id")
@staticmethod
def select_personal_next_step(email):
user_id = DBUser.select_user_native_id(email)
- return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_next_step", "owner_id", user_id, 1)
@staticmethod
def select_pr_email(vfName):
@@ -203,14 +230,19 @@ class DBUser:
# Fetch one AT&T user ID.
engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
engagement_manual_id = DBGeneral.select_where(
- "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ "engagement_manual_id", "ice_engagement", "uuid",
+ engagement_id, 1)
reviewer_id = DBGeneral.select_where(
- "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "peer_reviewer_id",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", reviewer_id, 1)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_el_email FAILED "
raise Exception(errorMsg, "select_el_email")
@@ -223,16 +255,30 @@ class DBUser:
return notifIDs
@staticmethod
- def get_not_seen_notifications_number_by_email(user_email, is_negative=False):
+ def get_not_seen_notifications_number_by_email(
+ user_email, is_negative=False):
user_id = DBGeneral.select_where_email(
"id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
if is_negative:
counter = 0
- while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
+ while notifications_number != "0" and counter <= Constants.\
+ Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
notifications_number = DBGeneral.select_where_and(
- Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ Constants.DBConstants.Queries.COUNT,
+ Constants.DBConstants.IceTables.NOTIFICATION,
+ "user_id",
+ user_id,
+ "is_read",
+ "False",
+ 1)
time.sleep(1)
counter += 1
return notifications_number
@@ -240,9 +286,14 @@ class DBUser:
@staticmethod
def get_eng_lead_email_per_enguuid(enguuid):
reviewer_id = DBGeneral.select_where(
- "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1)
+ "reviewer_id",
+ Constants.DBConstants.IceTables.ENGAGEMENT,
+ "uuid",
+ enguuid,
+ 1)
engLeadEmail = DBGeneral.select_where(
- "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1)
+ "email", Constants.DBConstants.IceTables.USER_PROFILE, "id",
+ reviewer_id, 1)
return engLeadEmail
@staticmethod
@@ -252,8 +303,12 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % (
- engLeadID)
+ queryStr = "select COUNT(*) from ice_engagement_engagement_team" +\
+ " Where iceuserprofile_id = %s" % engLeadID +\
+ " and (select " +\
+ "engagement_stage from public.ice_engagement " +\
+ "where uuid = engagement_id LIMIT 1) != 'Archived';"
+
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -262,7 +317,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -273,8 +328,14 @@ class DBUser:
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
- queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % (
- stage, engLeadID)
+ queryStr = "select count(*) from ice_engagement INNER JOIN " +\
+ "ice_engagement_engagement_team ON " +\
+ "ice_engagement_engagement_team.engagement_id= " +\
+ "ice_engagement.uuid Where " +\
+ "(ice_engagement.engagement_stage " +\
+ "= '%s') and " % stage +\
+ "(ice_engagement_engagement_team.iceuserprofile_id = " +\
+ "%s );" % engLeadID
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
result = cur.fetchall()
@@ -283,7 +344,7 @@ class DBUser:
logger.debug(result[0][0])
return result[0][0]
# If failed - count the failure and add the error to list of errors.
- except:
+ except BaseException:
errorMsg = "select_user_engagements_by_stage FAILED "
raise Exception(errorMsg, "select_user_engagements_by_stage")
@@ -294,7 +355,11 @@ class DBUser:
# Fetch one user ID.
index = DBGeneral.select_where_email("id", "auth_user", email)
DBGeneral.update_where(
- "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index)
+ "ice_custom_user",
+ "temp_password",
+ encodePass,
+ "user_ptr_id",
+ index)
@staticmethod
def set_password_to_default(email):
@@ -305,8 +370,9 @@ class DBUser:
@staticmethod
def select_el_not_in_engagement(el_name, pr_name):
- query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % (
- el_name, pr_name)
+ query_str = "select full_name from ice_user_profile where " +\
+ "role_id = 2 and full_name != '%s' and full_name != '%s';" % (
+ el_name, pr_name)
new_user = DBGeneral.select_query(query_str)
if new_user == 'None':
new_user = DBUser.update_to_el_not_in_engagement()
@@ -316,53 +382,65 @@ class DBUser:
def select_user_uuid(email):
user_uuid = DBUser.select_user_profile_property(email, "uuid")
return user_uuid
-
+
@staticmethod
def select_access_key(email):
- access_key = DBUser.select_user_profile_property(email, "rgwa_access_key")
+ access_key = DBUser.select_user_profile_property(
+ email, "rgwa_access_key")
return access_key
-
+
@staticmethod
def select_secret_key(email):
- secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key")
+ secret_key = DBUser.select_user_profile_property(
+ email, "rgwa_secret_key")
return secret_key
-
+
@staticmethod
def update_to_el_not_in_engagement():
query_str = "select uuid from ice_user_profile where role_id = 1 ;"
user_uuid = DBGeneral.select_query(query_str)
- updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % (
- user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name" +\
+ " = 'el_for_test' WHERE uuid = '%s' ;" % (
+ user_uuid)
DBGeneral.update_query(updatequery)
- updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % (
- 'el_for_test')
+ updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "full_name = '%s' ;" % (
+ 'el_for_test')
DBGeneral.update_query(updatequery)
return 'el_for_test'
@staticmethod
def rollback_for_el_not_in_engagement():
- query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';"
+ query_str = "select uuid from ice_user_profile where full_name = " +\
+ "'el_for_test';"
user_uuid = DBGeneral.select_query(query_str)
fullName = DBBridge.helper_rand_string("randomString")
- updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % (
- fullName, user_uuid)
+ updatequery = "UPDATE ice_user_profile SET role_id=1,full_name " +\
+ "= '%s' WHERE uuid = '%s' ;" % (fullName, user_uuid)
DBGeneral.update_query(updatequery)
@staticmethod
def set_engagement_peer_reviewer(engagement_uuid, email):
user_uuid = DBUser.select_user_uuid(email)
- update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid
+ update_query = "UPDATE ice_user_profile SET role_id=2 WHERE " +\
+ "uuid = '%s';" % user_uuid
DBGeneral.update_query(update_query)
user_id = DBGeneral.select_query(
"SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
- update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % (
- user_id, engagement_uuid)
+ update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s " +\
+ "WHERE uuid = '%s';" % (
+ user_id, engagement_uuid)
DBGeneral.update_query(update_query)
@staticmethod
def select_user_profile_property(user_email, property_name):
- return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1)
+ return DBGeneral.select_where(
+ property_name,
+ "ice_user_profile",
+ "email",
+ user_email,
+ 1)
@staticmethod
def validate_user_profile_settings_in_db(user_email, checked):
@@ -391,13 +469,23 @@ class DBUser:
def get_access_key(user_uuid):
counter = 0
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_key == "None" and counter <= \
+ Constants.RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter)
+ "rgwa_access_key are not ready yet, trying again (%s of 20)" %
+ counter)
access_key = DBGeneral.select_where(
- "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ "rgwa_access_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
counter += 1
return access_key
@@ -405,13 +493,23 @@ class DBUser:
def get_access_secret(user_uuid):
counter = 0
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
- while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+ while access_secret == "None" and counter <= Constants.\
+ RGWAConstants.RETRIES_NUMBER:
time.sleep(session.wait_until_time_pause)
logger.debug(
- "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter)
+ "rgwa_secret_key are not ready yet, trying again (%s of 100)" %
+ counter)
access_secret = DBGeneral.select_where(
- "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
-
+ "rgwa_secret_key",
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ "uuid",
+ user_uuid,
+ 1)
+
counter += 1
return access_secret
diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py
index 143bca2..f61d1b7 100644
--- a/services/database/db_virtual_function.py
+++ b/services/database/db_virtual_function.py
@@ -1,5 +1,5 @@
-
-# ============LICENSE_START==========================================
+
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -49,6 +49,7 @@ from services.logging_service import LoggingServiceFactory
logger = LoggingServiceFactory.get_logger()
+
class DBVirtualFunction:
@staticmethod
@@ -67,8 +68,10 @@ class DBVirtualFunction:
try:
logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
# Create INSERT query.
- queryStr = "INSERT INTO %s (""uuid, name, weight, ui_visibility"") VALUES ('%s', '%s', '%s', '%s');" % (
- queryTableName, uuid, name, 0, ui_visibility)
+ queryStr = "INSERT INTO %s " % queryTableName +\
+ "(""uuid, name, weight, ui_visibility"") VALUES " +\
+ "('%s', '%s', " % (uuid, name) +\
+ "'%s', '%s');" % (0, ui_visibility)
logger.debug("Query: " + queryStr)
cur.execute(queryStr) # Execute query.
dbConn.commit()
@@ -100,7 +103,8 @@ class DBVirtualFunction:
dbConn.commit()
logger.debug("Test results are in General now.")
except Exception as e:
- errorMsg = "Failed to delete ECOMP release from General . because :" + \
+ errorMsg = "Failed to delete ECOMP release from General ." +\
+ " because :" + \
str(e)
raise Exception(errorMsg)
raise
@@ -114,8 +118,10 @@ class DBVirtualFunction:
@staticmethod
def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage):
- query = "SELECT uuid FROM %s WHERE engagement_id='%s' AND engagement_stage='%s' ORDER BY position;" % (
- Constants.DBConstants.IceTables.NEXT_STEP, engagement_uuid, engagement_stage)
+ query = "SELECT uuid FROM %s WHERE " % (
+ Constants.DBConstants.IceTables.NEXT_STEP) + "engagement_id=" +\
+ "'%s' AND engagement_stage='%s' ORDER BY position;" % (
+ engagement_uuid, engagement_stage)
return DBGeneral.select_query(query, "list", 0)
@staticmethod
@@ -125,11 +131,17 @@ class DBVirtualFunction:
@staticmethod
def select_next_step_description(next_step_uuid):
- return DBGeneral.select_where("description", "ice_next_step", "uuid", next_step_uuid, 1)
+ return DBGeneral.select_where(
+ "description",
+ "ice_next_step",
+ "uuid",
+ next_step_uuid,
+ 1)
@staticmethod
def select_eng_uuid(vf_name):
- return DBGeneral.select_where("engagement_id", "ice_vf", "name", vf_name, 1)
+ return DBGeneral.select_where(
+ "engagement_id", "ice_vf", "name", vf_name, 1)
@staticmethod
def select_engagment_uuid_by_vf_name(vfName):
@@ -138,7 +150,11 @@ class DBVirtualFunction:
engagement_manual_id = DBGeneral.select_where(
"engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
enguuid = DBGeneral.select_where(
- "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ "uuid",
+ "ice_engagement",
+ "engagement_manual_id",
+ engagement_manual_id,
+ 1)
return enguuid
@staticmethod
@@ -149,7 +165,8 @@ class DBVirtualFunction:
@staticmethod
def select_vf_name_by_vf_version(version_name):
- queryofname = "SELECT name FROM ice_vf WHERE version= '%s';" % version_name
+ queryofname = "SELECT name FROM ice_vf WHERE " +\
+ "version= '%s';" % version_name
vfNameDb = str(DBGeneral.select_query(queryofname))
return vfNameDb
@@ -164,64 +181,106 @@ class DBVirtualFunction:
@staticmethod
def get_engagement():
- """Use this function instead of creating a new engagement where no need to"""
- queryStr = "SELECT DISTINCT ice_engagement.uuid, engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
- ice_user_profile.email, reviewer_table.full_name, reviewer_table.email, \
+ """Use this function instead of creating a new """ +\
+ """engagement where no need to"""
+ queryStr = "SELECT DISTINCT ice_engagement.uuid, " +\
+ "engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
+ ice_user_profile.email, reviewer_table.full_name, " +\
+ "reviewer_table.email, \
ice_deployment_target.version, ice_ecomp_release.name \
- FROM ice_engagement LEFT JOIN ice_vf ON engagement_id = ice_engagement.uuid \
- LEFT JOIN ice_user_profile reviewer_table ON reviewer_table.id = ice_engagement.reviewer_id \
- LEFT JOIN ice_user_profile ON ice_user_profile.id = ice_engagement.peer_reviewer_id \
- LEFT JOIN ice_deployment_target ON ice_deployment_target.uuid = ice_vf.deployment_target_id \
- LEFT JOIN ice_ecomp_release ON ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
+ FROM ice_engagement LEFT JOIN ice_vf ON engagement_id " +\
+ "= ice_engagement.uuid \
+ LEFT JOIN ice_user_profile reviewer_table ON " +\
+ "reviewer_table.id = ice_engagement.reviewer_id \
+ LEFT JOIN ice_user_profile ON ice_user_profile.id = " +\
+ "ice_engagement.peer_reviewer_id \
+ LEFT JOIN ice_deployment_target ON " +\
+ "ice_deployment_target.uuid = " +\
+ "ice_vf.deployment_target_id \
+ LEFT JOIN ice_ecomp_release ON " +\
+ "ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
WHERE ice_user_profile.id IS NOT NULL LIMIT 1;"
list_of_values = DBGeneral.select_query(queryStr, return_type="list")
- list_of_keys = ["engagement_uuid", "engagement_manual_id", "vfName", "pr_name",
- "pr_email", "el_name", "el_email", "target_aic", "ecomp_release"]
+ list_of_keys = [
+ "engagement_uuid",
+ "engagement_manual_id",
+ "vfName",
+ "pr_name",
+ "pr_email",
+ "el_name",
+ "el_email",
+ "target_aic",
+ "ecomp_release"]
return dict(zip(list_of_keys, list_of_values))
@staticmethod
def insert_aic_version(ui_visibility="TRUE"):
new_aic_version = {
- "uuid": str(uuid.uuid4()), "name": "AIC", "version": DBBridge.helper_rand_string("randomNumber", 2), "ui_visibility": ui_visibility, "weight": 0}
+ "uuid": str(
+ uuid.uuid4()),
+ "name": "AIC",
+ "version": DBBridge.helper_rand_string(
+ "randomNumber",
+ 2),
+ "ui_visibility": ui_visibility,
+ "weight": 0}
queryStr = "INSERT INTO public.ice_deployment_target( \
uuid, name, version, ui_visibility, weight) \
- VALUES ('%s', '%s', '%s', '%s', %s);" % (new_aic_version['uuid'], new_aic_version['name'], new_aic_version['version'], new_aic_version['ui_visibility'], new_aic_version['weight'])
+ VALUES " +\
+ "('%s', '%s', '%s', '%s', %s);" % (
+ new_aic_version['uuid'],
+ new_aic_version['name'],
+ new_aic_version['version'],
+ new_aic_version['ui_visibility'],
+ new_aic_version['weight'])
DBGeneral.insert_query(queryStr)
return new_aic_version
@staticmethod
def delete_aic_version(aic_uuid):
DBGeneral.insert_query(
- "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % aic_uuid)
+ "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" %
+ aic_uuid)
@staticmethod
def change_aic_version_weight(new_weight, old_weight):
DBGeneral.insert_query(
- "UPDATE public.ice_deployment_target SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+ "UPDATE public.ice_deployment_target " +
+ "SET weight=%s " % new_weight +
+ "WHERE weight=%s" % old_weight)
@staticmethod
def change_ecomp_release_weight(new_weight, old_weight):
DBGeneral.insert_query(
- "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+ "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" %
+ (new_weight, old_weight))
@staticmethod
def select_aic_version_uuid(aic_version):
- return DBGeneral.select_where("uuid", "ice_deployment_target", "version", aic_version, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_deployment_target", "version", aic_version, 1)
@staticmethod
def select_ecomp_release_uuid(ecomp_release):
- return DBGeneral.select_where("uuid", "ice_ecomp_release", "name", ecomp_release, 1)
+ return DBGeneral.select_where(
+ "uuid", "ice_ecomp_release", "name", ecomp_release, 1)
@staticmethod
def add_admin_to_eng_team(eng_uuid):
admin_db_id = DBGeneral.select_where(
- 'id', Constants.DBConstants.IceTables.USER_PROFILE, 'email', Constants.Users.Admin.EMAIL, 1)
- queryStr = "INSERT INTO public.ice_engagement_engagement_team(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
- eng_uuid, admin_db_id)
+ 'id',
+ Constants.DBConstants.IceTables.USER_PROFILE,
+ 'email',
+ Constants.Users.Admin.EMAIL,
+ 1)
+ queryStr = "INSERT INTO public.ice_engagement_engagement_team" +\
+ "(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
+ eng_uuid, admin_db_id)
logger.debug("add_admin_to_eng_team Query: %s" % queryStr)
DBGeneral.insert_query(queryStr)
@staticmethod
def remove_engagement_from_recent(vf_uuid):
DBGeneral.insert_query(
- "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.IceTables.RECENT, vf_uuid))
+ "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.
+ IceTables.RECENT, vf_uuid))