aboutsummaryrefslogtreecommitdiffstats
path: root/services/database
diff options
context:
space:
mode:
Diffstat (limited to 'services/database')
-rw-r--r--services/database/__init__.py38
-rw-r--r--services/database/__pycache__/__init__.cpython-36.pycbin0 -> 145 bytes
-rw-r--r--services/database/__pycache__/db_bridge.cpython-36.pycbin0 -> 1251 bytes
-rw-r--r--services/database/__pycache__/db_checklist.cpython-36.pycbin0 -> 12128 bytes
-rw-r--r--services/database/__pycache__/db_cms.cpython-36.pycbin0 -> 8742 bytes
-rw-r--r--services/database/__pycache__/db_general.cpython-36.pycbin0 -> 10828 bytes
-rw-r--r--services/database/__pycache__/db_user.cpython-36.pycbin0 -> 11452 bytes
-rw-r--r--services/database/__pycache__/db_virtual_function.cpython-36.pycbin0 -> 8197 bytes
-rw-r--r--services/database/db_bridge.py60
-rw-r--r--services/database/db_checklist.py386
-rw-r--r--services/database/db_cms.py265
-rwxr-xr-xservices/database/db_general.py416
-rw-r--r--services/database/db_user.py417
-rw-r--r--services/database/db_virtual_function.py227
14 files changed, 1809 insertions, 0 deletions
diff --git a/services/database/__init__.py b/services/database/__init__.py
new file mode 100644
index 0000000..30d7152
--- /dev/null
+++ b/services/database/__init__.py
@@ -0,0 +1,38 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
diff --git a/services/database/__pycache__/__init__.cpython-36.pyc b/services/database/__pycache__/__init__.cpython-36.pyc
new file mode 100644
index 0000000..12ed104
--- /dev/null
+++ b/services/database/__pycache__/__init__.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_bridge.cpython-36.pyc b/services/database/__pycache__/db_bridge.cpython-36.pyc
new file mode 100644
index 0000000..06fb01e
--- /dev/null
+++ b/services/database/__pycache__/db_bridge.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_checklist.cpython-36.pyc b/services/database/__pycache__/db_checklist.cpython-36.pyc
new file mode 100644
index 0000000..81e29bd
--- /dev/null
+++ b/services/database/__pycache__/db_checklist.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_cms.cpython-36.pyc b/services/database/__pycache__/db_cms.cpython-36.pyc
new file mode 100644
index 0000000..783d2f8
--- /dev/null
+++ b/services/database/__pycache__/db_cms.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_general.cpython-36.pyc b/services/database/__pycache__/db_general.cpython-36.pyc
new file mode 100644
index 0000000..c39ce12
--- /dev/null
+++ b/services/database/__pycache__/db_general.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_user.cpython-36.pyc b/services/database/__pycache__/db_user.cpython-36.pyc
new file mode 100644
index 0000000..46d136e
--- /dev/null
+++ b/services/database/__pycache__/db_user.cpython-36.pyc
Binary files differ
diff --git a/services/database/__pycache__/db_virtual_function.cpython-36.pyc b/services/database/__pycache__/db_virtual_function.cpython-36.pyc
new file mode 100644
index 0000000..8a46fb8
--- /dev/null
+++ b/services/database/__pycache__/db_virtual_function.cpython-36.pyc
Binary files differ
diff --git a/services/database/db_bridge.py b/services/database/db_bridge.py
new file mode 100644
index 0000000..fc765c7
--- /dev/null
+++ b/services/database/db_bridge.py
@@ -0,0 +1,60 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+class DBBridge:
+
+ """
+ This class helps to use functions inside classes with circular import (dependencies).
+ Use this class only when there is circular import in one of the DB services.
+ """
+
+ @staticmethod
+ def select_personal_next_step(user_email):
+ """select_personal_next_step: Originally can be found under DBUser class."""
+ from services.database.db_user import DBUser
+ return DBUser.select_personal_next_step(user_email)
+
+ @staticmethod
+ def helper_rand_string(type, num=""):
+ from services.helper import Helper
+ return Helper.rand_string(type, num)
+
+ @staticmethod
+ def helper_internal_assert(arg1, arg2):
+ from services.helper import Helper
+ return Helper.internal_assert(arg1, arg2)
diff --git a/services/database/db_checklist.py b/services/database/db_checklist.py
new file mode 100644
index 0000000..15851cf
--- /dev/null
+++ b/services/database/db_checklist.py
@@ -0,0 +1,386 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import time
+from uuid import uuid4
+
+from django.utils import timezone
+import psycopg2
+
+from services.constants import Constants
+from services.database.db_general import DBGeneral
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBChecklist:
+
+ @staticmethod
+ def select_where_approval_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and state = 'approval';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ if result == None:
+ errorMsg = "select_where_approval_state FAILED "
+ logger.error(errorMsg)
+ raise
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_approval_state FAILED "
+ raise Exception(errorMsg, "select_where_approval_state FAILED")
+
+ @staticmethod
+ def select_where_pr_state(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and state = 'peer_review';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ if result == None:
+ errorMsg = "select_where_pr_state FAILED "
+ logger.error(errorMsg)
+ raise
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_where_cl_not_archive(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and state != 'archive';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_native_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def update_checklist_to_review_state(queryTableName):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE ice_checklist SET state='review' Where name= '%s' and state= 'pending';" % (
+ queryTableName)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "Could not Update User"
+ raise Exception(errorMsg, "Update")
+
+ @staticmethod
+ def update_all_decisions_to_approve(whereParametrValue):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE ice_checklist_decision SET review_value='approved' , peer_review_value='approved' Where checklist_id = '%s';" % (
+ whereParametrValue)
+ logger.debug(queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ logger.debug("Query : " + queryStr)
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "Could not Update User"
+ logger.debug(e)
+ raise Exception(errorMsg, "Update")
+ finally:
+ dbConn.close()
+
+ @staticmethod
+ def is_archive(checklistName):
+ try:
+ result = False
+ # Fetch all AT&T user ID.
+ checklist_ids = DBGeneral.select_where(
+ "uuid", "ice_checklist", "name", checklistName, 0)
+# checklist_ids = DBGeneral.list_format(checklist_ids)
+ for checklist_id in checklist_ids: # Second Example
+ if isinstance(checklist_id, tuple):
+ checklist_id = checklist_id[0]
+ state = DBGeneral.select_where(
+ "state", "ice_checklist", "uuid", checklist_id, 1)
+ if state == "archive":
+ result = True
+ break
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "is_archive FAILED "
+ raise Exception(errorMsg, "is_archive")
+
+ @staticmethod
+ def get_pr_email(checklistUuid):
+ try:
+ # Fetch one AT&T user ID.
+ owner_id = DBChecklist.select_where_pr_state(
+ "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", "ice_user_profile", "id", owner_id, 1)
+ logger.debug("get_pr_email = " + engLeadEmail)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "get_pr_email FAILED " + str(e)
+ raise Exception(errorMsg, "get_pr_email")
+
+ @staticmethod
+ def get_admin_email(checklistUuid):
+ try:
+ owner_id = DBChecklist.select_where_approval_state(
+ "owner_id", "ice_checklist", "uuid", checklistUuid, 1) # Fetch one AT&T user ID.
+ engLeadEmail = DBGeneral.select_where(
+ "email", "ice_user_profile", "id", owner_id, 1)
+ logger.debug("get_admin_email = " + engLeadEmail)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "get_admin_email FAILED "
+ raise Exception(errorMsg, "get_admin_email")
+
+ @staticmethod
+ def get_owner_email(checklistUuid):
+ try:
+ # Fetch one AT&T user ID.
+ owner_id = DBChecklist.select_native_where(
+ "owner_id", "ice_checklist", "uuid", checklistUuid, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", "ice_user_profile", "id", owner_id, 1)
+ logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "get_admin_email FAILED "
+ raise Exception(errorMsg, "get_owner_email")
+
+ @staticmethod
+ def update_decisions(checklistUuid, checklistName):
+ checklistTempid = DBGeneral.select_where(
+ "template_id", "ice_checklist", "name", checklistName, 1)
+ checklistLineItems = DBGeneral.select_where_and(
+ "uuid", "ice_checklist_line_item", "line_type", "auto", "template_id", checklistTempid, 0)
+ for lineItem in checklistLineItems:
+ setParametrType2 = "peer_review_value"
+ setParametrValue2 = "approved"
+ whereParametrType2 = "lineitem_id"
+ whereParametrValue2 = lineItem
+ DBGeneral.update_where_and("ice_checklist_decision", "review_value", checklistUuid, "approved",
+ "checklist_id", setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2)
+
+ @staticmethod
+ def checkChecklistIsUpdated():
+ query = "select uuid from ice_checklist_section where template_id in (select template_id from ice_checklist_template where name='{template_name}') and name='{section_name}'".format(
+ template_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT, section_name=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
+ return DBGeneral.select_query(query)
+
+ @staticmethod
+ def fetchEngByVfName(vfName):
+ # Fetch one AT&T user ID.
+ return DBGeneral.select_where("engagement_id", "ice_vf", "name", vfName, 1)
+
+ @staticmethod
+ def fetchEngManIdByEngUuid(engagement_id):
+ return DBGeneral.select_where("engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+
+ @staticmethod
+ def fetchChecklistByName(checklistName):
+ query = "select uuid from ice_checklist where name='{cl_name}'".format(
+ cl_name=checklistName)
+ return DBGeneral.select_query(query)
+
+ @staticmethod
+ def create_default_heat_teampleate():
+ template_query = "INSERT INTO public.ice_checklist_template(uuid, name, category, version, create_time)"\
+ "VALUES ('%s', '%s', '%s', '%s', '%s');" % (
+ str(uuid4()), 'Editing Heat', 'first category', '1', timezone.now())
+ DBGeneral.insert_query(template_query)
+ template_id = DBGeneral.select_query(
+ "SELECT uuid FROM public.ice_checklist_template where name = 'Editing Heat'")
+ # SECTIONS
+ section1_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'External References',
+ '1', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ DBGeneral.insert_query(section1_query)
+ section1_id = DBGeneral.select_query(
+ ("""SELECT uuid FROM public.ice_checklist_section where name = 'External References' and template_id = '{s}'""").format(s=template_id))
+ section2_query = "INSERT INTO public.ice_checklist_section(uuid, name, weight, description, validation_instructions, create_time, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (str(uuid4()), 'Parameter Specification',
+ '2', 'section descripyion', 'valid instructions', timezone.now(), template_id)
+ DBGeneral.insert_query(section2_query)
+ section2_id = DBGeneral.select_query(
+ ("""SELECT uuid FROM public.ice_checklist_section where name = 'Parameter Specification' and template_id = '{s}'""").format(s=template_id))
+ # Line items
+ line_item1 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Normal references', '1', 'Numeric parameters should include range and/or allowed values.', 'manual',
+ 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ DBGeneral.insert_query(line_item1)
+ line_item2 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'String parameters', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ DBGeneral.insert_query(line_item2)
+ line_item3 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Numeric parameters', '3', 'Numeric parameters should include range and/or allowed values.', 'manual',
+ 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ DBGeneral.insert_query(line_item3)
+ line_item4 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time, section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'VF image', '2', 'Numeric parameters should include range and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section1_id, template_id)
+ DBGeneral.insert_query(line_item4)
+ line_item5 = "INSERT INTO public.ice_checklist_line_item(uuid, name, weight, description, line_type, validation_instructions,create_time,section_id, template_id) "\
+ "VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s', '%s', '%s');" % (str(uuid4()), 'Parameters', '1', 'Numeric parameters should include range and/or allowed values.', 'auto',
+ 'Here are some useful tips for how to validate this item in the most awesome way:<br><br><ul><li>Here is my awesome tip 1</li><li>Here is my awesome tip 2</li><li>Here is my awesome tip 3</li></ul>', timezone.now(), section2_id, template_id)
+ DBGeneral.insert_query(line_item5)
+
+ @staticmethod
+ def create_editing_cl_template_if_not_exist():
+ template_id = DBGeneral.select_query(("""SELECT uuid FROM public.ice_checklist_template where name = '{s}'""").format(
+ s=Constants.Dashboard.LeftPanel.EditChecklistTemplate.HEAT))
+ if template_id == 'None':
+ DBChecklist.create_default_heat_teampleate()
+ session.createTemplatecount = True
+
+ @staticmethod
+ def state_changed(identify_field, field_value, expected_state):
+ get_state = str(DBGeneral.select_where(
+ "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+ counter = 0
+ while get_state != expected_state and counter <= Constants.DBConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause_long)
+ logger.debug("Checklist state not changed yet , expecting state: %s, current result: %s (attempt %s of %s)" % (
+ expected_state, get_state, counter, Constants.DBConstants.RETRIES_NUMBER))
+ counter += 1
+ get_state = str(DBGeneral.select_where(
+ "state", Constants.DBConstants.IceTables.CHECKLIST, identify_field, field_value, 1))
+
+ if get_state == expected_state:
+ logger.debug("Checklist state was successfully changed into: " +
+ expected_state + ", and was verified over the DB")
+ return expected_state
+ raise Exception(
+ "Expected checklist state never arrived " + expected_state, get_state)
+
+ @staticmethod
+ def get_recent_checklist_uuid(name):
+ required_uuid = DBGeneral.select_where_not_and_order_by_desc(
+ 'uuid', Constants.DBConstants.IceTables.CHECKLIST, 'name', name, 'state', Constants.ChecklistStates.Archive.TEXT, 'create_time')
+ return required_uuid
diff --git a/services/database/db_cms.py b/services/database/db_cms.py
new file mode 100644
index 0000000..3c2b2c6
--- /dev/null
+++ b/services/database/db_cms.py
@@ -0,0 +1,265 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import psycopg2
+from wheel.signatures import assertTrue
+
+from services.constants import Constants
+from services.database.db_general import DBGeneral
+from services.frontend.base_actions.click import Click
+from services.frontend.base_actions.enter import Enter
+from services.frontend.base_actions.wait import Wait
+from services.frontend.fe_dashboard import FEDashboard
+from services.frontend.fe_general import FEGeneral
+from services.frontend.fe_user import FEUser
+from services.helper import Helper
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+
+class DBCMS:
+
+ @staticmethod
+ def insert_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('cms_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Insert query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def update_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('cms_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Update query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def select_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('cms_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def get_cms_category_id(categoryName):
+ logger.debug("Get DBCMS category id for name: " + categoryName)
+ queryStr = "SELECT id FROM public.blog_blogcategory WHERE title = '%s' LIMIT 1;" % (
+ categoryName)
+ logger.debug("Query : " + queryStr)
+ result = DBCMS.select_query(queryStr)
+ return result
+
+ @staticmethod
+ def insert_cms_new_post(title, description, categoryName):
+ logger.debug("Insert new post : " + title)
+ queryStr = "INSERT INTO public.blog_blogpost" \
+ "(comments_count, keywords_string, rating_count, rating_sum, rating_average, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, content, allow_comments, featured_image, site_id, user_id) "\
+ "VALUES (0, '', 0, 0, 0, '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '2 day', 2, current_timestamp - interval '1 day', NULL, '', true, '<p>%s</p>', true, '', 1, 1);" % (
+ title, title, description, description)
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+ post_id = DBCMS.get_last_added_post_id()
+ categoryId = DBCMS.get_cms_category_id(categoryName)
+ DBCMS.add_category_to_post(post_id, categoryId)
+ return post_id
+
+ @staticmethod
+ def get_last_added_post_id():
+ logger.debug("Get the id of the post inserted")
+ queryStr = "select MAX(id) FROM public.blog_blogpost;"
+ logger.debug("Query : " + queryStr)
+ result = DBCMS.select_query(queryStr)
+ return result
+
+ @staticmethod
+ def update_days(xdays, title):
+ logger.debug("Get the id of the post inserted")
+# queryStr = "select MAX(id) FROM public.blog_blogpost;"
+ queryStr = "UPDATE public.blog_blogpost SET created=current_timestamp - interval '%s day' WHERE title='%s';" % (
+ xdays, title)
+ logger.debug("Query : " + queryStr)
+ result = DBCMS.update_query(queryStr)
+ return result
+
+ @staticmethod
+ def add_category_to_post(postId, categoryId):
+ logger.debug("bind category into inserted post: " + postId)
+ queryStr = "INSERT INTO public.blog_blogpost_categories(blogpost_id, blogcategory_id) VALUES (%s, %s);" % (
+ postId, categoryId)
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+
+ @staticmethod
+ def get_documentation_page_id():
+ logger.debug("Retrive id of documentation page: ")
+ queryStr = "SELECT id FROM public.pages_page WHERE title = 'Documentation' LIMIT 1;"
+ logger.debug("Query : " + queryStr)
+ result = DBCMS.select_query(queryStr)
+ return result
+
+ @staticmethod
+ def get_last_inserted_page_id():
+ logger.debug("Retrive id of last page inserted: ")
+ queryStr = "select MAX(id) FROM public.pages_page;"
+ logger.debug("Query : " + queryStr)
+ result = DBCMS.select_query(queryStr)
+ return result
+
+ @staticmethod
+ def delete_old_tips_of_the_day():
+ logger.debug("Delete all posts ")
+ queryStr = "DELETE FROM public.blog_blogpost_categories WHERE id>0;"
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+ queryStr = "DELETE FROM public.blog_blogpost WHERE id>0;;"
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+
+ @staticmethod
+ def insert_page(title, content, parent_id=None):
+ logger.debug("Retrive id of documentation page: ")
+ if parent_id is None:
+ parent_id = DBCMS.get_documentation_page_id()
+ queryStr = "INSERT INTO public.pages_page(" \
+ "keywords_string, title, slug, _meta_title, description, gen_description, created, updated, status, publish_date, expiry_date, short_url, in_sitemap, _order, in_menus, titles, content_model, login_required, parent_id, site_id)" \
+ "VALUES ('', '%s', '%s-slug', '', '%s', true, current_timestamp - interval '1 day', current_timestamp - interval '1 day', 2, current_timestamp - interval '1 day', NULL, '', true, 0, '1,2,3', '%s', 'richtextpage', true, %s, 1);" % (
+ title, title, content, title, parent_id)
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+
+ createdPageId = DBCMS.get_last_inserted_page_id()
+ logger.debug(
+ "Bind the page with the rich text content related to this page")
+ queryStr = "INSERT INTO public.pages_richtextpage(page_ptr_id, content) VALUES (%s, '<p>%s</p>');" % (
+ createdPageId, content)
+ logger.debug("Query : " + queryStr)
+ DBCMS.insert_query(queryStr)
+ return createdPageId
+
+ @staticmethod
+ def create_faq():
+ title = "title_FAQ" + Helper.rand_string("randomString")
+ description = "description_FAQ_" + Helper.rand_string("randomString")
+ DBCMS.delete_old_tips_of_the_day()
+ postId = DBCMS.insert_cms_new_post(title, description, "FAQ")
+ assertTrue(len(postId) > 0 and not None)
+ return title, description
+
+ @staticmethod
+ def create_news():
+ title = "title_News" + Helper.rand_string("randomString")
+ description = "description_News" + Helper.rand_string("randomString")
+ postId = DBCMS.insert_cms_new_post(title, description, "News")
+ assertTrue(len(postId) > 0 and not None)
+ return title, description
+
+ @staticmethod
+ def create_announcement():
+ title = "title_Announcement_" + Helper.rand_string("randomString")
+ description = "description_Announcement_" + \
+ Helper.rand_string("randomString")
+ postId = DBCMS.insert_cms_new_post(title, description, "Announcement")
+ assertTrue(len(postId) > 0 and not None)
+ return title, description
+
+ @staticmethod
+ def create_page(parent_id=None):
+ title = "title_Of_Page_" + Helper.rand_string("randomString")
+ description = "description_Of_Page_" + \
+ Helper.rand_string("randomString")
+ createdPageId = DBCMS.insert_page(title, description)
+ assertTrue(len(createdPageId) > 0 and not None)
+ return title, description
+
+ @staticmethod
+ def update_X_days_back_post(title, xdays):
+ logger.debug("Get the id of the post inserted")
+ queryStr = "UPDATE blog_blogpost SET created = current_timestamp - interval '%s day', publish_date=current_timestamp - interval '%s day' WHERE title= '%s' ;" % (
+ xdays, xdays, title)
+ logger.debug("Query : " + queryStr)
+ DBCMS.update_query(queryStr)
+
+ @staticmethod
+ def create_announcements(x):
+ listOfTitleAnDescriptions = []
+ for _ in range(x):
+ # print x ->str
+ title = "title_Announcement_" + Helper.rand_string("randomString")
+ description = "description_Announcement_" + \
+ Helper.rand_string("randomString")
+ postId = DBCMS.insert_cms_new_post(
+ title, description, "Announcement")
+ assertTrue(len(postId) > 0 and not None)
+ xList = [title, description]
+ listOfTitleAnDescriptions.append(xList)
+ return listOfTitleAnDescriptions
diff --git a/services/database/db_general.py b/services/database/db_general.py
new file mode 100755
index 0000000..c850d3a
--- /dev/null
+++ b/services/database/db_general.py
@@ -0,0 +1,416 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+from datetime import datetime
+import sqlite3
+
+from django.conf import settings
+from django.db import transaction
+import psycopg2
+
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBGeneral:
+
+ @staticmethod
+ # desigredDB: Use 'default' for CI General and 'em_db' for EM General
+ # (according to settings.DATABASES).
+ def return_db_native_connection(desigredDB):
+ dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \
+ "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
+ "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
+ "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
+ "' port='" + \
+ str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
+ return dbConnectionStr
+
+ @staticmethod
+ def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "):
+ try:
+ if settings.DATABASE_TYPE == 'sqlite':
+ dbfile = str(settings.DATABASES['default']['TEST_NAME'])
+ dbConn = sqlite3.connect(dbfile)
+ cur = dbConn.cursor()
+ else:
+ # Connect to General 'default'.
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection("default"))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ except Exception as e:
+ errorMsg = "Failed to create connection to General." + str(e)
+ raise Exception(errorMsg)
+ try: # Create INSERT query.
+ if settings.DATABASE_TYPE == 'sqlite':
+ query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\
+ 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);'
+ else:
+ query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\
+ 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);'
+ cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()),
+ settings.ICE_BUILD_REPORT_NUM, testDuration))
+ dbConn.commit()
+ logger.debug("Test result in DB - " + testResult)
+ except Exception as e:
+ logger.error(e)
+ errorMsg = "Failed to insert results to DB." + str(e)
+ raise Exception(errorMsg)
+ dbConn.close()
+
+ @staticmethod
+ def select_query(queryStr, return_type="str", fetch_num=1):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ if return_type == "str":
+ if fetch_num == 1:
+ result = str(cur.fetchone())
+ else:
+ result = str(cur.fetchall())
+ if result != 'None':
+ # formatting strings e.g uuid
+ if(result.find("',)") != -1):
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ if return_type == "list":
+ if fetch_num == 1:
+ result = list(cur.fetchone())
+ else:
+ result = [item[0] for item in cur.fetchall()]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ except:
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def insert_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Insert query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ logger.error(e)
+ transaction.rollback()
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def update_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Update query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ logger.error(e)
+ transaction.rollback()
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def select_where_email(queryColumnName, queryTableName, email):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s WHERE Email = '%s';" % (
+ queryColumnName, queryTableName, email)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_email FAILED "
+ raise Exception(errorMsg, "select_where_email")
+ raise
+
+ @staticmethod
+ def select_from(queryColumnName, queryTableName, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s;" % (queryColumnName, queryTableName)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "select_from FAILED " + str(e)
+ raise Exception(errorMsg, "select_from")
+
+ @staticmethod
+ def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = list(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \
+ % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_dict(queryColumnName, queryTableName, whereParametrType):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ x = ""
+ count = 0
+ for key, val in whereParametrType.items():
+ x += "%s='%s'" % (key, val)
+ if len(whereParametrType.items()) - count > 1:
+ x += ' and '
+ count += 1
+ queryStr = "select %s from %s Where %s;" \
+ % (queryColumnName, queryTableName, x)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd, order_by):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \
+ % (queryColumnName, queryTableName, whereParametrType, whereParametrValue,
+ parametrTypeAnd, parametrAnd, order_by)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue,
+ parametrTypeAnd, parametrAnd, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_and FAILED "
+ raise Exception(errorMsg, "select_where_and")
+
+ @staticmethod
+ def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = cur.fetchall()
+ elif (fetchNum == 1):
+ result = cur.fetchone()
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_is_bigger FAILED "
+ raise Exception(errorMsg, "select_where_is_bigger")
+
+ @staticmethod
+ def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
+ queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue)
+ cur.execute(queryStr)
+ dbConn.commit()
+ logger.debug("Query : " + queryStr)
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "Could not Update User"
+ logger.debug(e)
+ raise Exception(errorMsg, "Update")
+ finally:
+ dbConn.close()
+
+ @staticmethod
+ def update_by_query(queryStr):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "Could not Update User"
+ raise Exception(errorMsg, "Update")
+
+ @staticmethod
+ def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % (
+ queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "Could not Update User"
+ raise Exception(errorMsg, "Update")
+
+ @staticmethod
+ def list_format(un_listed):
+ un_listed = un_listed[1:-1]
+ un_listed = un_listed.replace("',), ('", "|||")
+ un_listed = un_listed.replace("(u'", "") # Format list
+ un_listed = un_listed[1:-1].replace("('", "") # Format list
+ un_listed = un_listed.replace("',)", "") # Format list
+ listed = un_listed[1:-2].split("|||")
+ return listed
+
+ @staticmethod
+ def get_vendors_list():
+ # Select approved vendors from db.
+ vendors_list = DBGeneral.select_where(
+ "name", "ice_vendor", "public", "TRUE", 0)
+ return vendors_list
diff --git a/services/database/db_user.py b/services/database/db_user.py
new file mode 100644
index 0000000..d347dd2
--- /dev/null
+++ b/services/database/db_user.py
@@ -0,0 +1,417 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import time
+
+from django.conf import settings
+import psycopg2
+
+from services.constants import Constants
+from services.database.db_bridge import DBBridge
+from services.database.db_general import DBGeneral
+from services.database.db_virtual_function import DBVirtualFunction
+from services.frontend.base_actions.wait import Wait
+from services.logging_service import LoggingServiceFactory
+from services.session import session
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBUser:
+
+ @staticmethod
+ def get_activation_url(email):
+ # Fetch one user ID.
+ uuid = DBUser.select_user_uuid(email)
+ # Fetch one user ID.
+ index = DBGeneral.select_where_email("id", "auth_user", email)
+ activation_token = DBGeneral.select_where(
+ "activation_token", "ice_custom_user", "user_ptr_id", index, 1)
+ # / activate /:userID /:token
+ activationUrl = settings.ICE_PORTAL_URL + '/#/activate/' + \
+ str(uuid) + '/' + str(activation_token)
+ logger.debug("activationUrl :" + activationUrl)
+ return activationUrl
+
+ @staticmethod
+ def get_contact_signup_url(invite_token, uuid, email, fullName, phoneNum, companyName):
+ companyId = DBGeneral.select_where(
+ "uuid", "ice_vendor", "name", companyName, 1)
+ signUpURLforContact = settings.ICE_PORTAL_URL + "#/signUp?invitation=" + invite_token + \
+ "&email=" + email + "&full_name=" + fullName + \
+ "&phone_number=" + phoneNum + "&company=" + companyId
+ logger.debug("SignUpURLforContact :" + signUpURLforContact)
+ return signUpURLforContact
+
+ @staticmethod
+ def select_invitation_token(queryColumnName, queryTableName, whereParametrType, whereParametrValue, email, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and email = '%s' ;" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, email)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ if result == None:
+ errorMsg = "select_where_pr_state FAILED "
+ logger.error(errorMsg)
+ raise
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def get_el_name(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadFullName = DBGeneral.select_where_and(
+ "full_name", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ return engLeadFullName
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "get_el_name FAILED "
+ raise Exception(errorMsg, "get_el_name")
+
+ @staticmethod
+ def get_email_by_full_name(fullname):
+ # try:
+ query_str = "select email from ice_user_profile where full_name = '%s';" % (
+ fullname)
+ user_email = DBGeneral.select_query(query_str)
+ return user_email
+# except: # If failed - count the failure and add the error to list of errors.
+# errorMsg = "get_email_by_full_name FAILED "
+# raise Exception(errorMsg, "get_el_name")
+
+ @staticmethod
+ def select_recent_vf_of_user(user_uuid, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "SELECT vf_id FROM public.ice_recent_engagement where user_uuid = '%s' order by last_update desc limit 20;" % (
+ user_uuid)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_el_email(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadEmail = DBGeneral.select_where_and(
+ "email", "ice_user_profile", "id", reviewer_id, "role_id", "2", 1)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_el_email FAILED "
+ raise Exception(errorMsg, "select_el_email")
+
+ @staticmethod
+ def select_user_native_id(email):
+ try:
+ # Fetch one AT&T user ID.
+ engLeadId = DBUser.select_user_profile_property(email, "id")
+ return engLeadId
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_native_id FAILED "
+ raise Exception(errorMsg, "select_user_native_id")
+
+ @staticmethod
+ def select_personal_next_step(email):
+ user_id = DBUser.select_user_native_id(email)
+ return DBGeneral.select_where("uuid", "ice_next_step", "owner_id", user_id, 1)
+
+ @staticmethod
+ def select_pr_email(vfName):
+ try:
+ # Fetch one AT&T user ID.
+ engagement_id = DBVirtualFunction.select_eng_uuid(vfName)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ reviewer_id = DBGeneral.select_where(
+ "peer_reviewer_id", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", "ice_user_profile", "id", reviewer_id, 1)
+ return engLeadEmail
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_el_email FAILED "
+ raise Exception(errorMsg, "select_el_email")
+
+ @staticmethod
+ def get_notification_id_by_email(userEmail):
+ uuid = DBGeneral.select_where_email(
+ "id", "ice_user_profile", userEmail)
+ notifIDs = DBGeneral.select_where(
+ "uuid", "ice_notification", "user_id", uuid, 0)
+ return notifIDs
+
+ @staticmethod
+ def get_not_seen_notifications_number_by_email(user_email, is_negative=False):
+ user_id = DBGeneral.select_where_email(
+ "id", Constants.DBConstants.IceTables.USER_PROFILE, user_email)
+ notifications_number = DBGeneral.select_where_and(
+ Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ if is_negative:
+ counter = 0
+ while notifications_number != "0" and counter <= Constants.Dashboard.Avatar.Notifications.Count.RETRIES_NUMBER:
+ notifications_number = DBGeneral.select_where_and(
+ Constants.DBConstants.Queries.COUNT, Constants.DBConstants.IceTables.NOTIFICATION, "user_id", user_id, "is_read", "False", 1)
+ time.sleep(1)
+ counter += 1
+ return notifications_number
+
+ @staticmethod
+ def get_eng_lead_email_per_enguuid(enguuid):
+ reviewer_id = DBGeneral.select_where(
+ "reviewer_id", Constants.DBConstants.IceTables.ENGAGEMENT, "uuid", enguuid, 1)
+ engLeadEmail = DBGeneral.select_where(
+ "email", Constants.DBConstants.IceTables.USER_PROFILE, "id", reviewer_id, 1)
+ return engLeadEmail
+
+ @staticmethod
+ def select_all_user_engagements(engLeadID):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select COUNT(*) from ice_engagement_engagement_team Where iceuserprofile_id = %s and (select engagement_stage from public.ice_engagement where uuid = engagement_id LIMIT 1) != 'Archived';" % (
+ engLeadID)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = cur.fetchall()
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ logger.debug(result[0][0])
+ return result[0][0]
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_engagements_by_stage FAILED "
+ raise Exception(errorMsg, "select_user_engagements_by_stage")
+
+ @staticmethod
+ def select_user_engagements_by_stage(stage, engLeadID):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select count(*) from ice_engagement INNER JOIN ice_engagement_engagement_team ON ice_engagement_engagement_team.engagement_id= ice_engagement.uuid Where (ice_engagement.engagement_stage = '%s') and (ice_engagement_engagement_team.iceuserprofile_id = %s );" % (
+ stage, engLeadID)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = cur.fetchall()
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ logger.debug(result[0][0])
+ return result[0][0]
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_user_engagements_by_stage FAILED "
+ raise Exception(errorMsg, "select_user_engagements_by_stage")
+
+ @staticmethod
+ def set_new_temp_password(email):
+ encodePass = DBGeneral.select_where_email(
+ "password", "auth_user", Constants.Users.Admin.EMAIL)
+ # Fetch one user ID.
+ index = DBGeneral.select_where_email("id", "auth_user", email)
+ DBGeneral.update_where(
+ "ice_custom_user", "temp_password", encodePass, "user_ptr_id", index)
+
+ @staticmethod
+ def set_password_to_default(email):
+ encodePass = DBGeneral.select_where_email(
+ "password", "auth_user", Constants.Users.Admin.EMAIL)
+ DBGeneral.update_where(
+ "auth_user", "password", encodePass, "email", email)
+
+ @staticmethod
+ def select_el_not_in_engagement(el_name, pr_name):
+ query_str = "select full_name from ice_user_profile where role_id = 2 and full_name != '%s' and full_name != '%s';" % (
+ el_name, pr_name)
+ new_user = DBGeneral.select_query(query_str)
+ if new_user == 'None':
+ new_user = DBUser.update_to_el_not_in_engagement()
+ return new_user
+
+ @staticmethod
+ def select_user_uuid(email):
+ user_uuid = DBUser.select_user_profile_property(email, "uuid")
+ return user_uuid
+
+ @staticmethod
+ def select_access_key(email):
+ access_key = DBUser.select_user_profile_property(email, "rgwa_access_key")
+ return access_key
+
+ @staticmethod
+ def select_secret_key(email):
+ secret_key = DBUser.select_user_profile_property(email, "rgwa_secret_key")
+ return secret_key
+
+ @staticmethod
+ def update_to_el_not_in_engagement():
+ query_str = "select uuid from ice_user_profile where role_id = 1 ;"
+ user_uuid = DBGeneral.select_query(query_str)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 ,full_name = 'el_for_test' WHERE uuid = '%s' ;" % (
+ user_uuid)
+ DBGeneral.update_query(updatequery)
+ updatequery = "UPDATE ice_user_profile SET role_id=2 WHERE full_name = '%s' ;" % (
+ 'el_for_test')
+ DBGeneral.update_query(updatequery)
+ return 'el_for_test'
+
+ @staticmethod
+ def rollback_for_el_not_in_engagement():
+ query_str = "select uuid from ice_user_profile where full_name = 'el_for_test';"
+ user_uuid = DBGeneral.select_query(query_str)
+ fullName = DBBridge.helper_rand_string("randomString")
+ updatequery = "UPDATE ice_user_profile SET role_id=1,full_name = '%s' WHERE uuid = '%s' ;" % (
+ fullName, user_uuid)
+ DBGeneral.update_query(updatequery)
+
+ @staticmethod
+ def set_engagement_peer_reviewer(engagement_uuid, email):
+ user_uuid = DBUser.select_user_uuid(email)
+ update_query = "UPDATE ice_user_profile SET role_id=2 WHERE uuid = '%s';" % user_uuid
+ DBGeneral.update_query(update_query)
+
+ user_id = DBGeneral.select_query(
+ "SELECT id FROM ice_user_profile WHERE uuid = '%s';" % user_uuid)
+ update_query = "UPDATE ice_engagement SET peer_reviewer_id=%s WHERE uuid = '%s';" % (
+ user_id, engagement_uuid)
+ DBGeneral.update_query(update_query)
+
+ @staticmethod
+ def select_user_profile_property(user_email, property_name):
+ return DBGeneral.select_where(property_name, "ice_user_profile", "email", user_email, 1)
+
+ @staticmethod
+ def validate_user_profile_settings_in_db(user_email, checked):
+ Wait.page_has_loaded()
+ regular_email_updates = DBUser.select_user_profile_property(
+ user_email, 'regular_email_updates')
+ DBBridge.helper_internal_assert(regular_email_updates, checked)
+ email_updates_on_every_notification = \
+ DBUser.select_user_profile_property(
+ user_email, 'email_updates_on_every_notification')
+ DBBridge.helper_internal_assert(
+ email_updates_on_every_notification, checked)
+ email_updates_daily_digest = DBUser.select_user_profile_property(
+ user_email, 'email_updates_daily_digest')
+ DBBridge.helper_internal_assert(
+ email_updates_daily_digest, not checked)
+
+ @staticmethod
+ def retrieve_admin_ssh_from_db():
+ ssh_key = DBGeneral.select_where(
+ 'ssh_public_key', Constants.DBConstants.IceTables.USER_PROFILE,
+ 'email', Constants.Users.Admin.EMAIL, 1)
+ return ssh_key
+
+ @staticmethod
+ def get_access_key(user_uuid):
+ counter = 0
+ access_key = DBGeneral.select_where(
+ "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ while access_key == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "rgwa_access_key are not ready yet, trying again (%s of 20)" % counter)
+ access_key = DBGeneral.select_where(
+ "rgwa_access_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ counter += 1
+ return access_key
+
+ @staticmethod
+ def get_access_secret(user_uuid):
+ counter = 0
+ access_secret = DBGeneral.select_where(
+ "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+ while access_secret == "None" and counter <= Constants.RGWAConstants.RETRIES_NUMBER:
+ time.sleep(session.wait_until_time_pause)
+ logger.debug(
+ "rgwa_secret_key are not ready yet, trying again (%s of 100)" % counter)
+ access_secret = DBGeneral.select_where(
+ "rgwa_secret_key", Constants.DBConstants.IceTables.USER_PROFILE, "uuid", user_uuid, 1)
+
+ counter += 1
+ return access_secret
diff --git a/services/database/db_virtual_function.py b/services/database/db_virtual_function.py
new file mode 100644
index 0000000..143bca2
--- /dev/null
+++ b/services/database/db_virtual_function.py
@@ -0,0 +1,227 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+import uuid
+
+from django.conf import settings
+import psycopg2
+
+from services.constants import Constants
+from services.database.db_bridge import DBBridge
+from services.database.db_general import DBGeneral
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBVirtualFunction:
+
+ @staticmethod
+ def insert_ecomp_release(uuid, name, ui_visibility="TRUE"):
+ try:
+ queryTableName = "ice_ecomp_release"
+ # Connect to General 'default'.
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection("em_db"))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
+ except Exception as e:
+ errorMsg = "Failed to create connection to General: " + str(e)
+ raise Exception(errorMsg)
+ try:
+ logger.debug("DATABASE_TYPE: " + settings.DATABASE_TYPE)
+ # Create INSERT query.
+ queryStr = "INSERT INTO %s (""uuid, name, weight, ui_visibility"") VALUES ('%s', '%s', '%s', '%s');" % (
+ queryTableName, uuid, name, 0, ui_visibility)
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr) # Execute query.
+ dbConn.commit()
+ logger.debug("Test results are in General now.")
+ except Exception as e:
+ errorMsg = "Failed to insert ECOMP release to General:" + str(e)
+ raise Exception(errorMsg)
+ dbConn.close()
+
+ @staticmethod
+ def delete_ecomp_release(uuid, name):
+ try:
+ queryTableName = "ice_ecomp_release"
+ # Connect to General 'default'.
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection("em_db"))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ except Exception as e:
+ errorMsg = "Failed to create connection to DBGeneral.because :" + \
+ str(e)
+ raise Exception(errorMsg)
+ try:
+ # Create INSERT query.
+ queryStr = "DELETE FROM %s WHERE uuid = '%s';" % (
+ queryTableName, uuid)
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr) # Execute query.
+ dbConn.commit()
+ logger.debug("Test results are in General now.")
+ except Exception as e:
+ errorMsg = "Failed to delete ECOMP release from General . because :" + \
+ str(e)
+ raise Exception(errorMsg)
+ raise
+ dbConn.close()
+
+ @staticmethod
+ def select_next_steps_ids(engagement_uuid):
+ ice_next_steps = DBGeneral.select_where(
+ "uuid", "ice_next_step", "engagement_id", engagement_uuid, 0)
+ return ice_next_steps
+
+ @staticmethod
+ def select_next_steps_uuids_by_stage(engagement_uuid, engagement_stage):
+ query = "SELECT uuid FROM %s WHERE engagement_id='%s' AND engagement_stage='%s' ORDER BY position;" % (
+ Constants.DBConstants.IceTables.NEXT_STEP, engagement_uuid, engagement_stage)
+ return DBGeneral.select_query(query, "list", 0)
+
+ @staticmethod
+ def update_next_step_position(next_step_uuid, new_index):
+ DBGeneral.update_where(
+ "ice_next_step", "position", new_index, "uuid", next_step_uuid)
+
+ @staticmethod
+ def select_next_step_description(next_step_uuid):
+ return DBGeneral.select_where("description", "ice_next_step", "uuid", next_step_uuid, 1)
+
+ @staticmethod
+ def select_eng_uuid(vf_name):
+ return DBGeneral.select_where("engagement_id", "ice_vf", "name", vf_name, 1)
+
+ @staticmethod
+ def select_engagment_uuid_by_vf_name(vfName):
+ engagement_id = DBGeneral.select_where(
+ "engagement_id", "ice_vf", "name", vfName, 1)
+ engagement_manual_id = DBGeneral.select_where(
+ "engagement_manual_id", "ice_engagement", "uuid", engagement_id, 1)
+ enguuid = DBGeneral.select_where(
+ "uuid", "ice_engagement", "engagement_manual_id", engagement_manual_id, 1)
+ return enguuid
+
+ @staticmethod
+ def select_vf_version_by_vf_name(vfName):
+ queryStr = "SELECT version FROM ice_vf WHERE name= '%s';" % vfName
+ version_name = str(DBGeneral.select_query(queryStr))
+ return version_name
+
+ @staticmethod
+ def select_vf_name_by_vf_version(version_name):
+ queryofname = "SELECT name FROM ice_vf WHERE version= '%s';" % version_name
+ vfNameDb = str(DBGeneral.select_query(queryofname))
+ return vfNameDb
+
+ @staticmethod
+ def return_expected_steps(engagement_uuid, stage, user_email):
+ steps_uuids = DBVirtualFunction.select_next_steps_uuids_by_stage(
+ engagement_uuid, stage)
+ personal_step_uuid = DBBridge.select_personal_next_step(user_email)
+ DBVirtualFunction.update_next_step_position(personal_step_uuid, 1)
+ steps_uuids.insert(0, personal_step_uuid)
+ return steps_uuids
+
+ @staticmethod
+ def get_engagement():
+ """Use this function instead of creating a new engagement where no need to"""
+ queryStr = "SELECT DISTINCT ice_engagement.uuid, engagement_manual_id, ice_vf.name, ice_user_profile.full_name, \
+ ice_user_profile.email, reviewer_table.full_name, reviewer_table.email, \
+ ice_deployment_target.version, ice_ecomp_release.name \
+ FROM ice_engagement LEFT JOIN ice_vf ON engagement_id = ice_engagement.uuid \
+ LEFT JOIN ice_user_profile reviewer_table ON reviewer_table.id = ice_engagement.reviewer_id \
+ LEFT JOIN ice_user_profile ON ice_user_profile.id = ice_engagement.peer_reviewer_id \
+ LEFT JOIN ice_deployment_target ON ice_deployment_target.uuid = ice_vf.deployment_target_id \
+ LEFT JOIN ice_ecomp_release ON ice_ecomp_release.uuid = ice_vf.ecomp_release_id \
+ WHERE ice_user_profile.id IS NOT NULL LIMIT 1;"
+ list_of_values = DBGeneral.select_query(queryStr, return_type="list")
+ list_of_keys = ["engagement_uuid", "engagement_manual_id", "vfName", "pr_name",
+ "pr_email", "el_name", "el_email", "target_aic", "ecomp_release"]
+ return dict(zip(list_of_keys, list_of_values))
+
+ @staticmethod
+ def insert_aic_version(ui_visibility="TRUE"):
+ new_aic_version = {
+ "uuid": str(uuid.uuid4()), "name": "AIC", "version": DBBridge.helper_rand_string("randomNumber", 2), "ui_visibility": ui_visibility, "weight": 0}
+ queryStr = "INSERT INTO public.ice_deployment_target( \
+ uuid, name, version, ui_visibility, weight) \
+ VALUES ('%s', '%s', '%s', '%s', %s);" % (new_aic_version['uuid'], new_aic_version['name'], new_aic_version['version'], new_aic_version['ui_visibility'], new_aic_version['weight'])
+ DBGeneral.insert_query(queryStr)
+ return new_aic_version
+
+ @staticmethod
+ def delete_aic_version(aic_uuid):
+ DBGeneral.insert_query(
+ "DELETE FROM public.ice_deployment_target WHERE uuid='%s';" % aic_uuid)
+
+ @staticmethod
+ def change_aic_version_weight(new_weight, old_weight):
+ DBGeneral.insert_query(
+ "UPDATE public.ice_deployment_target SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+
+ @staticmethod
+ def change_ecomp_release_weight(new_weight, old_weight):
+ DBGeneral.insert_query(
+ "UPDATE public.ice_ecomp_release SET weight=%s WHERE weight=%s" % (new_weight, old_weight))
+
+ @staticmethod
+ def select_aic_version_uuid(aic_version):
+ return DBGeneral.select_where("uuid", "ice_deployment_target", "version", aic_version, 1)
+
+ @staticmethod
+ def select_ecomp_release_uuid(ecomp_release):
+ return DBGeneral.select_where("uuid", "ice_ecomp_release", "name", ecomp_release, 1)
+
+ @staticmethod
+ def add_admin_to_eng_team(eng_uuid):
+ admin_db_id = DBGeneral.select_where(
+ 'id', Constants.DBConstants.IceTables.USER_PROFILE, 'email', Constants.Users.Admin.EMAIL, 1)
+ queryStr = "INSERT INTO public.ice_engagement_engagement_team(engagement_id, iceuserprofile_id) VALUES ('%s', '%s');" % (
+ eng_uuid, admin_db_id)
+ logger.debug("add_admin_to_eng_team Query: %s" % queryStr)
+ DBGeneral.insert_query(queryStr)
+
+ @staticmethod
+ def remove_engagement_from_recent(vf_uuid):
+ DBGeneral.insert_query(
+ "DELETE FROM %s WHERE vf_id='%s'" % (Constants.DBConstants.IceTables.RECENT, vf_uuid))