summaryrefslogtreecommitdiffstats
path: root/services/database/db_general.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/database/db_general.py')
-rwxr-xr-xservices/database/db_general.py485
1 files changed, 0 insertions, 485 deletions
diff --git a/services/database/db_general.py b/services/database/db_general.py
deleted file mode 100755
index 2c83fb0..0000000
--- a/services/database/db_general.py
+++ /dev/null
@@ -1,485 +0,0 @@
-# ============LICENSE_START==========================================
-# org.onap.vvp/test-engine
-# ===================================================================
-# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
-# ===================================================================
-#
-# Unless otherwise specified, all software contained herein is licensed
-# under the Apache License, Version 2.0 (the “License”);
-# you may not use this software except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#
-# Unless otherwise specified, all documentation contained herein is licensed
-# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
-# you may not use this documentation except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://creativecommons.org/licenses/by/4.0/
-#
-# Unless required by applicable law or agreed to in writing, documentation
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# ============LICENSE_END============================================
-#
-# ECOMP is a trademark and service mark of AT&T Intellectual Property.
-from datetime import datetime
-import sqlite3
-
-from django.conf import settings
-from django.db import transaction
-import psycopg2
-
-from services.logging_service import LoggingServiceFactory
-
-
-logger = LoggingServiceFactory.get_logger()
-
-
-class DBGeneral:
-
- @staticmethod
- # desigredDB: Use 'default' for CI General and 'em_db' for EM General
- # (according to settings.DATABASES).
- def return_db_native_connection(desigredDB):
- dbConnectionStr = "dbname='" + str(
- settings.SINGLETONE_DB[desigredDB]['NAME']) + \
- "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
- "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
- "' password='" + str(
- settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
- "' port='" + \
- str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
- return dbConnectionStr
-
- @staticmethod
- def insert_results(
- testType,
- testFeature,
- testResult,
- testName,
- testDuration,
- notes=" "):
- try:
- if settings.DATABASE_TYPE == 'sqlite':
- dbfile = str(settings.DATABASES['default']['TEST_NAME'])
- dbConn = sqlite3.connect(dbfile)
- cur = dbConn.cursor()
- else:
- # Connect to General 'default'.
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection("default"))
- dbConn = dbConn
- cur = dbConn.cursor()
- except Exception as e:
- errorMsg = "Failed to create connection to General." + str(e)
- raise Exception(errorMsg)
- try: # Create INSERT query.
- if settings.DATABASE_TYPE == 'sqlite':
- query_str = 'INSERT INTO ice_test_results ' +\
- '(testType, testFeature, testResult, testName, notes,'\
- 'create_time, build_id, duration) VALUES ' +\
- '(?, ?, ?, ?, ?, ?, ?, ?);'
- else:
- query_str = 'INSERT INTO ice_test_results ("testType", ' +\
- '"testFeature", "testResult", "testName", notes,'\
- 'create_time, build_id, duration) VALUES ' +\
- '(%s, %s, %s, %s, %s, %s, %s, %s);'
- cur.execute(query_str, (testType, testFeature, testResult,
- testName, notes,
- str(datetime.now()),
- settings.ICE_BUILD_REPORT_NUM,
- testDuration))
- dbConn.commit()
- logger.debug("Test result in DB - " + testResult)
- except Exception as e:
- logger.error(e)
- errorMsg = "Failed to insert results to DB." + str(e)
- raise Exception(errorMsg)
- dbConn.close()
-
- @staticmethod
- def select_query(queryStr, return_type="str", fetch_num=1):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- if return_type == "str":
- if fetch_num == 1:
- result = str(cur.fetchone())
- else:
- result = str(cur.fetchall())
- if result != 'None':
- # formatting strings e.g uuid
- if(result.find("',)") != -1):
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- if return_type == "list":
- if fetch_num == 1:
- result = list(cur.fetchone())
- else:
- result = [item[0] for item in cur.fetchall()]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- except BaseException:
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def insert_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Insert query success!")
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def update_query(queryStr):
- try:
- nativeIceDb = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = nativeIceDb
- cur = dbConn.cursor()
- logger.debug("Query: " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- logger.debug("Update query success!")
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- logger.error(e)
- transaction.rollback()
- raise Exception("Couldn't fetch answer using the given query.")
-
- @staticmethod
- def select_where_email(queryColumnName, queryTableName, email):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s WHERE Email = '%s';" % (
- queryColumnName, queryTableName, email)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_email FAILED "
- raise Exception(errorMsg, "select_where_email")
- raise
-
- @staticmethod
- def select_from(queryColumnName, queryTableName, fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = "select %s from %s;" % (queryColumnName, queryTableName)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "select_from FAILED " + str(e)
- raise Exception(errorMsg, "select_from")
-
- @staticmethod
- def select_where(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = list(cur.fetchall())
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where FAILED "
- raise Exception(errorMsg, "select_where")
-
- @staticmethod
- def select_where_order_by_desc(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- order_by):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName,) +\
- "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
- "order by %s desc limit 1;" % order_by
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_dict(queryColumnName, queryTableName, whereParametrType):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- x = ""
- count = 0
- for key, val in whereParametrType.items():
- x += "%s='%s'" % (key, val)
- if len(whereParametrType.items()) - count > 1:
- x += ' and '
- count += 1
- queryStr = "select %s from %s Where %s;" \
- % (queryColumnName, queryTableName, x)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_not_and_order_by_desc(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- order_by):
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- cur = dbConn.cursor()
- queryStr = \
- "select %s from %s " % (queryColumnName, queryTableName) +\
- "Where %s = '%s' " % (whereParametrType, whereParametrValue) +\
- "and %s != '%s' " % (parametrTypeAnd, parametrAnd) +\
- "order by %s desc limit 1;" % order_by
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- dbConn.close()
- return result
-
- @staticmethod
- def select_where_and(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = str(cur.fetchall())
- result = DBGeneral.list_format(result)
- elif (fetchNum == 1):
- result = str(cur.fetchone())
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- logger.debug("Query result: " + str(result))
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_and FAILED "
- raise Exception(errorMsg, "select_where_and")
-
- @staticmethod
- def select_where_is_bigger(
- queryColumnName,
- queryTableName,
- whereParametrType,
- whereParametrValue,
- parametrTypeAnd,
- parametrAnd,
- fetchNum):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
- queryColumnName, queryTableName, whereParametrType,
- whereParametrValue, parametrTypeAnd, parametrAnd)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- if (fetchNum == 0):
- result = cur.fetchall()
- elif (fetchNum == 1):
- result = cur.fetchone()
- if(result.find("',)") != -1): # formatting strings e.g uuid
- result = result.partition('\'')[-1].rpartition('\'')[0]
- elif(result.find(",)") != -1): # formatting ints e.g id
- result = result.partition('(')[-1].rpartition(',')[0]
- dbConn.close()
- return result
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "select_where_is_bigger FAILED "
- raise Exception(errorMsg, "select_where_is_bigger")
-
- @staticmethod
- def update_where(
- queryTableName,
- setParametrType,
- setparametrValue,
- whereParametrType,
- whereParametrValue):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
- queryTableName, setParametrType, setparametrValue,
- whereParametrType, whereParametrValue)
- cur.execute(queryStr)
- dbConn.commit()
- logger.debug("Query : " + queryStr)
- # If failed - count the failure and add the error to list of errors.
- except Exception as e:
- errorMsg = "Could not Update User"
- logger.debug(e)
- raise Exception(errorMsg, "Update")
- finally:
- dbConn.close()
-
- @staticmethod
- def update_by_query(queryStr):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "Could not Update User"
- raise Exception(errorMsg, "Update")
-
- @staticmethod
- def update_where_and(
- queryTableName,
- setParametrType,
- parametrValue,
- changeToValue,
- whereParametrType,
- setParametrType2,
- setParametrValue2,
- whereParametrType2,
- whereParametrValue2):
- try:
- dbConn = psycopg2.connect(
- DBGeneral.return_db_native_connection('em_db'))
- dbConn = dbConn
- cur = dbConn.cursor()
- queryStr = "UPDATE %s SET " % queryTableName +\
- "%s = '%s', " % (setParametrType, changeToValue) +\
- "%s = '%s' Where " % (setParametrType2, setParametrValue2) +\
- "%s = '%s' " % (whereParametrType, parametrValue) +\
- "and %s = '%s';" % (whereParametrType2, whereParametrValue2)
- logger.debug("Query : " + queryStr)
- cur.execute(queryStr)
- dbConn.commit()
- dbConn.close()
- # If failed - count the failure and add the error to list of errors.
- except BaseException:
- errorMsg = "Could not Update User"
- raise Exception(errorMsg, "Update")
-
- @staticmethod
- def list_format(un_listed):
- un_listed = un_listed[1:-1]
- un_listed = un_listed.replace("',), ('", "|||")
- un_listed = un_listed.replace("(u'", "") # Format list
- un_listed = un_listed[1:-1].replace("('", "") # Format list
- un_listed = un_listed.replace("',)", "") # Format list
- listed = un_listed[1:-2].split("|||")
- return listed
-
- @staticmethod
- def get_vendors_list():
- # Select approved vendors from db.
- vendors_list = DBGeneral.select_where(
- "name", "ice_vendor", "public", "TRUE", 0)
- return vendors_list