# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
# under the Apache License, Version 2.0 (the “License”);
# you may not use this software except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
# Unless otherwise specified, all documentation contained herein is licensed
# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
# you may not use this documentation except in compliance with the License.
# You may obtain a copy of the License at
#
# https://creativecommons.org/licenses/by/4.0/
#
# Unless required by applicable law or agreed to in writing, documentation
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ============LICENSE_END============================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
import time
from uuid import uuid4
from django.utils import timezone
import psycopg2
from services.constants import Constants
from services.database.db_general import DBGeneral
from services.logging_service import LoggingServiceFactory
from services.session import session
logger = LoggingServiceFactory.get_logger()
class DBChecklist:
@staticmethod
def select_where_approval_state(
queryColumnName,
queryTableName,
whereParametrType,
whereParametrValue,
fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = \
"select %s from %s " % (queryColumnName, queryTableName) +\
"Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
" and state = 'approval';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
result = str(cur.fetchall())
elif (fetchNum == 1):
result = str(cur.fetchone())
if(result.find("',)") != -1): # formatting strings e.g uuid
result = result.partition('\'')[-1].rpartition('\'')[0]
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
logger.debug("Query result: " + str(result))
if result is None:
errorMsg = "select_where_approval_state FAILED "
logger.error(errorMsg)
raise
return result
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "select_where_approval_state FAILED "
raise Exception(errorMsg, "select_where_approval_state FAILED")
@staticmethod
def select_where_pr_state(
queryColumnName,
queryTableName,
whereParametrType,
whereParametrValue,
fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = \
"select %s from %s " % (queryColumnName, queryTableName) +\
"Where %s = '%s' and " % (
whereParametrType, whereParametrValue) +\
"state = 'peer_review';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
result = str(cur.fetchall())
elif (fetchNum == 1):
result = str(cur.fetchone())
if(result.find("',)") != -1): # formatting strings e.g uuid
result = result.partition('\'')[-1].rpartition('\'')[0]
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
if result is None:
errorMsg = "select_where_pr_state FAILED "
logger.error(errorMsg)
raise
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
def select_where_cl_not_archive(
queryColumnName,
queryTableName,
whereParametrType,
whereParametrValue,
fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = \
"select %s from %s " % (queryColumnName, queryTableName) +\
"Where %s = '%s'" % (whereParametrType, whereParametrValue) +\
"and state != 'archive';"
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
result = str(cur.fetchall())
elif (fetchNum == 1):
result = str(cur.fetchone())
if(result.find("',)") != -1): # formatting strings e.g uuid
result = result.partition('\'')[-1].rpartition('\'')[0]
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
def select_native_where(
queryColumnName,
queryTableName,
whereParametrType,
whereParametrValue,
fetchNum):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "select %s from %s Where %s = '%s';" % (
queryColumnName, queryTableName, whereParametrType,
whereParametrValue)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
if (fetchNum == 0):
result = str(cur.fetchall())
elif (fetchNum == 1):
result = str(cur.fetchone())
if(result.find("',)") != -1): # formatting strings e.g uuid
result = result.partition('\'')[-1].rpartition('\'')[0]
elif(result.find(",)") != -1): # formatting ints e.g id
result = result.partition('(')[-1].rpartition(',')[0]
dbConn.close()
logger.debug("Query result: " + str(result))
return result
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "select_where FAILED "
raise Exception(errorMsg, "select_where")
@staticmethod
def update_checklist_to_review_state(queryTableName):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "UPDATE ice_checklist SET state='review' Where " +\
"name= '%s' and state= 'pending';" % (
queryTableName)
logger.debug("Query : " + queryStr)
cur.execute(queryStr)
dbConn.commit()
dbConn.close()
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "Could not Update User"
raise Exception(errorMsg, "Update")
@staticmethod
def update_all_decisions_to_approve(whereParametrValue):
try:
dbConn = psycopg2.connect(
DBGeneral.return_db_native_connection('em_db'))
dbConn = dbConn
cur = dbConn.cursor()
queryStr = "UPDATE ice_checklist_decision SET " +\
"review_value='approved' , peer_review_value='approved' " +\
"Where checklist_id = '%s';" % (
whereParametrValue)
logger.debug(queryStr)
cur.execute(queryStr)
dbConn.commit()
logger.debug("Query : " + queryStr)
# If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "Could not Update User"
logger.debug(e)
raise Exception(errorMsg, "Update")
finally:
dbConn.close()
@staticmethod
def is_archive(checklistName):
try:
result = False
# Fetch all AT&T user ID.
checklist_ids = DBGeneral.select_where(
"uuid", "ice_checklist", "name", checklistName, 0)
# checklist_ids = DBGeneral.list_format(checklist_ids)
for checklist_id in checklist_ids: # Second Example
if isinstance(checklist_id, tuple):
checklist_id = checklist_id[0]
state = DBGeneral.select_where(
"state", "ice_checklist", "uuid", checklist_id, 1)
if state == "archive":
result = True
break
return result
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "is_archive FAILED "
raise Exception(errorMsg, "is_archive")
@staticmethod
def get_pr_email(checklistUuid):
try:
# Fetch one AT&T user ID.
owner_id = DBChecklist.select_where_pr_state(
"owner_id", "ice_checklist", "uuid", checklistUuid, 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", owner_id, 1)
logger.debug("get_pr_email = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
except Exception as e:
errorMsg = "get_pr_email FAILED " + str(e)
raise Exception(errorMsg, "get_pr_email")
@staticmethod
def get_admin_email(checklistUuid):
try:
# Fetch one AT&T user ID.
owner_id = DBChecklist.select_where_approval_state(
"owner_id", "ice_checklist", "uuid", checklistUuid, 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", owner_id, 1)
logger.debug("get_admin_email = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_admin_email")
@staticmethod
def get_owner_email(checklistUuid):
try:
# Fetch one AT&T user ID.
owner_id = DBChecklist.select_native_where(
"owner_id", "ice_checklist", "uuid", checklistUuid, 1)
engLeadEmail = DBGeneral.select_where(
"email", "ice_user_profile", "id", owner_id, 1)
logger.debug("getPreeReviewerEngLeadEmail = " + engLeadEmail)
return engLeadEmail
# If failed - count the failure and add the error to list of errors.
except BaseException:
errorMsg = "get_admin_email FAILED "
raise Exception(errorMsg, "get_owner_email")
@staticmethod
def update_decisions(checklistUuid, checklistName):
checklistTempid = DBGeneral.select_where(
"template_id", "ice_checklist", "name", checklistName, 1)
checklistLineItems = DBGeneral.select_where_and(
"uuid",
"ice_checklist_line_item",
"line_type",
"auto",
"template_id",
checklistTempid,
0)
for lineItem in checklistLineItems:
setParametrType2 = "peer_review_value"
setParametrValue2 = "approved"
whereParametrType2 = "lineitem_id"
whereParametrValue2 = lineItem
DBGeneral.update_where_and(
"ice_checklist_decision",
"review_value",
checklistUuid,
"approved",
"checklist_id",
setParametrType2,
setParametrValue2,
whereParametrType2,
whereParametrValue2)
@staticmethod
def checkChecklistIsUpdated():
query = "select uuid from ice_checklist_section where template_id " +\
"in (select template_id from ice_checklist_template where " +\
"name='{template_name}') and name='{section_name}'".format(
template_name=Constants.Dashboard.LeftPanel.
EditChecklistTemplate.HEAT, section_name=Constants.
Dashboard.LeftPanel.EditChecklistTemplate.HEAT)
return DBGeneral.select_query(query)
@staticmethod
def fetchEngByVfName(vfName):
# Fetch one AT&T user ID.
return DBGeneral.select_where(
"engagement_id", "ice_vf", "name", vfName, 1)
@staticmethod
def fetchEngManIdByEngUuid(engagement_id):
return DBGeneral.select_where(
"engagement_manual_id",
"ice_engagement",
"uuid",
engagement_id,
1)
@staticmethod
def fetchChecklistByName(checklistName):
query = "select uuid from ice_checklist where " +\
"name='{cl_name}'".format(
cl_name=checklistName)
return DBGeneral.select_query(query)
@staticmethod
def create_default_heat_teampleate():
template_query = "INSERT INTO public.ice_checklist_template(uuid, " +\
"name, category, version, create_time)"\
"VALUES ('%s', '%s', '%s', '%s', '%s');" % (
str(uuid4()), 'Editing Heat', 'first category', '1',
timezone.now())
DBGeneral.insert_query(template_query)
template_id = DBGeneral.select_query(
"SELECT uuid FROM public.ice_checklist_template where " +
"name = 'Editing Heat'")
# SECTIONS
section1_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
"name, weight, description, validation_instructions, " +\
"create_time, template_id) "\
"VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
str(uuid4()), 'External References',
'1', 'section descripyion', 'valid instructions',
timezone.now(), template_id)
DBGeneral.insert_query(section1_query)
section1_id = DBGeneral.select_query(
("""SELECT uuid FROM public.ice_checklist_section """ +
"""where name = 'External References' """ +
"""and template_id = '{s}'""").format(
s=template_id))
section2_query = "INSERT INTO public.ice_checklist_section(uuid, " +\
"name, weight, description, validation_instructions, " +\
"create_time, template_id) "\
"VALUES ('%s', '%s', '%s', '%s', '%s','%s', '%s');" % (
str(uuid4()), 'Parameter Specification',
'2', 'section descripyion', 'valid instructions',
timezone.now(), template_id)
DBGeneral.insert_query(section2_query)
section2_id = DBGeneral.select_query(
("""SELECT uuid FROM public.ice_checklist_section """ +
"""where name = """ +
"""'Parameter Specification' and template_id = '{s}'""").format(
s=template_id))
# Line items
line_item1 = \
"INSERT INTO public.ice_checklist_line_item(uuid, " +\
"name, weight, description, line_type, validation_instructions," +\
"create_time,section_id, template_id) "\
"VALUES ('%s', '%s', " % (str(uuid4()), 'Normal references') +\
"'%s', " % '1' +\
"'%s'," % 'Numeric parameters should include ' +\
'range and/or allowed values.' +\
" '%s'," % 'manual', +\
"'%s'" % 'Here are some useful tips ' +\
'for how to validate this item ' +\
'in the most awesome way: