aboutsummaryrefslogtreecommitdiffstats
path: root/services/database/db_general.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/database/db_general.py')
-rwxr-xr-xservices/database/db_general.py416
1 files changed, 416 insertions, 0 deletions
diff --git a/services/database/db_general.py b/services/database/db_general.py
new file mode 100755
index 0000000..c850d3a
--- /dev/null
+++ b/services/database/db_general.py
@@ -0,0 +1,416 @@
+
+# ============LICENSE_START==========================================
+# org.onap.vvp/test-engine
+# ===================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the “License”);
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the “License”);
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+from datetime import datetime
+import sqlite3
+
+from django.conf import settings
+from django.db import transaction
+import psycopg2
+
+from services.logging_service import LoggingServiceFactory
+
+
+logger = LoggingServiceFactory.get_logger()
+
+class DBGeneral:
+
+ @staticmethod
+ # desigredDB: Use 'default' for CI General and 'em_db' for EM General
+ # (according to settings.DATABASES).
+ def return_db_native_connection(desigredDB):
+ dbConnectionStr = "dbname='" + str(settings.SINGLETONE_DB[desigredDB]['NAME']) + \
+ "' user='" + str(settings.SINGLETONE_DB[desigredDB]['USER']) + \
+ "' host='" + str(settings.SINGLETONE_DB[desigredDB]['HOST']) + \
+ "' password='" + str(settings.SINGLETONE_DB[desigredDB]['PASSWORD']) + \
+ "' port='" + \
+ str(settings.SINGLETONE_DB[desigredDB]['PORT']) + "'"
+ return dbConnectionStr
+
+ @staticmethod
+ def insert_results(testType, testFeature, testResult, testName, testDuration, notes=" "):
+ try:
+ if settings.DATABASE_TYPE == 'sqlite':
+ dbfile = str(settings.DATABASES['default']['TEST_NAME'])
+ dbConn = sqlite3.connect(dbfile)
+ cur = dbConn.cursor()
+ else:
+ # Connect to General 'default'.
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection("default"))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ except Exception as e:
+ errorMsg = "Failed to create connection to General." + str(e)
+ raise Exception(errorMsg)
+ try: # Create INSERT query.
+ if settings.DATABASE_TYPE == 'sqlite':
+ query_str = 'INSERT INTO ice_test_results (testType, testFeature, testResult, testName, notes,'\
+ 'create_time, build_id, duration) VALUES (?, ?, ?, ?, ?, ?, ?, ?);'
+ else:
+ query_str = 'INSERT INTO ice_test_results ("testType", "testFeature", "testResult", "testName", notes,'\
+ 'create_time, build_id, duration) VALUES (%s, %s, %s, %s, %s, %s, %s, %s);'
+ cur.execute(query_str, (testType, testFeature, testResult, testName, notes, str(datetime.now()),
+ settings.ICE_BUILD_REPORT_NUM, testDuration))
+ dbConn.commit()
+ logger.debug("Test result in DB - " + testResult)
+ except Exception as e:
+ logger.error(e)
+ errorMsg = "Failed to insert results to DB." + str(e)
+ raise Exception(errorMsg)
+ dbConn.close()
+
+ @staticmethod
+ def select_query(queryStr, return_type="str", fetch_num=1):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ if return_type == "str":
+ if fetch_num == 1:
+ result = str(cur.fetchone())
+ else:
+ result = str(cur.fetchall())
+ if result != 'None':
+ # formatting strings e.g uuid
+ if(result.find("',)") != -1):
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ if return_type == "list":
+ if fetch_num == 1:
+ result = list(cur.fetchone())
+ else:
+ result = [item[0] for item in cur.fetchall()]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ except:
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def insert_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Insert query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ logger.error(e)
+ transaction.rollback()
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def update_query(queryStr):
+ try:
+ nativeIceDb = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = nativeIceDb
+ cur = dbConn.cursor()
+ logger.debug("Query: " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ logger.debug("Update query success!")
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ logger.error(e)
+ transaction.rollback()
+ raise Exception("Couldn't fetch answer using the given query.")
+
+ @staticmethod
+ def select_where_email(queryColumnName, queryTableName, email):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s WHERE Email = '%s';" % (
+ queryColumnName, queryTableName, email)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_email FAILED "
+ raise Exception(errorMsg, "select_where_email")
+ raise
+
+ @staticmethod
+ def select_from(queryColumnName, queryTableName, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s;" % (queryColumnName, queryTableName)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "select_from FAILED " + str(e)
+ raise Exception(errorMsg, "select_from")
+
+ @staticmethod
+ def select_where(queryColumnName, queryTableName, whereParametrType, whereParametrValue, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = list(cur.fetchall())
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where FAILED "
+ raise Exception(errorMsg, "select_where")
+
+ @staticmethod
+ def select_where_order_by_desc(queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' order by %s desc limit 1;" \
+ % (queryColumnName, queryTableName, whereParametrType, whereParametrValue, order_by)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_dict(queryColumnName, queryTableName, whereParametrType):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ x = ""
+ count = 0
+ for key, val in whereParametrType.items():
+ x += "%s='%s'" % (key, val)
+ if len(whereParametrType.items()) - count > 1:
+ x += ' and '
+ count += 1
+ queryStr = "select %s from %s Where %s;" \
+ % (queryColumnName, queryTableName, x)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_not_and_order_by_desc(queryColumnName, queryTableName, whereParametrType,
+ whereParametrValue, parametrTypeAnd, parametrAnd, order_by):
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s != '%s' order by %s desc limit 1;" \
+ % (queryColumnName, queryTableName, whereParametrType, whereParametrValue,
+ parametrTypeAnd, parametrAnd, order_by)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ dbConn.close()
+ return result
+
+ @staticmethod
+ def select_where_and(queryColumnName, queryTableName, whereParametrType, whereParametrValue,
+ parametrTypeAnd, parametrAnd, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s = '%s';" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = str(cur.fetchall())
+ result = DBGeneral.list_format(result)
+ elif (fetchNum == 1):
+ result = str(cur.fetchone())
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ logger.debug("Query result: " + str(result))
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_and FAILED "
+ raise Exception(errorMsg, "select_where_and")
+
+ @staticmethod
+ def select_where_is_bigger(queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd, fetchNum):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "select %s from %s Where %s = '%s' and %s > %s;" % (
+ queryColumnName, queryTableName, whereParametrType, whereParametrValue, parametrTypeAnd, parametrAnd)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ if (fetchNum == 0):
+ result = cur.fetchall()
+ elif (fetchNum == 1):
+ result = cur.fetchone()
+ if(result.find("',)") != -1): # formatting strings e.g uuid
+ result = result.partition('\'')[-1].rpartition('\'')[0]
+ elif(result.find(",)") != -1): # formatting ints e.g id
+ result = result.partition('(')[-1].rpartition(',')[0]
+ dbConn.close()
+ return result
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "select_where_is_bigger FAILED "
+ raise Exception(errorMsg, "select_where_is_bigger")
+
+ @staticmethod
+ def update_where(queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE %s SET %s = '%s' Where %s = '%s';" % (
+ queryTableName, setParametrType, setparametrValue, whereParametrType, whereParametrValue)
+ cur.execute(queryStr)
+ dbConn.commit()
+ logger.debug("Query : " + queryStr)
+ # If failed - count the failure and add the error to list of errors.
+ except Exception as e:
+ errorMsg = "Could not Update User"
+ logger.debug(e)
+ raise Exception(errorMsg, "Update")
+ finally:
+ dbConn.close()
+
+ @staticmethod
+ def update_by_query(queryStr):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "Could not Update User"
+ raise Exception(errorMsg, "Update")
+
+ @staticmethod
+ def update_where_and(queryTableName, setParametrType, parametrValue, changeToValue, whereParametrType, setParametrType2, setParametrValue2, whereParametrType2, whereParametrValue2):
+ try:
+ dbConn = psycopg2.connect(
+ DBGeneral.return_db_native_connection('em_db'))
+ dbConn = dbConn
+ cur = dbConn.cursor()
+ queryStr = "UPDATE %s SET %s = '%s', %s = '%s' Where %s = '%s' and %s = '%s';" % (
+ queryTableName, setParametrType, changeToValue, setParametrType2, setParametrValue2, whereParametrType, parametrValue, whereParametrType2, whereParametrValue2)
+ logger.debug("Query : " + queryStr)
+ cur.execute(queryStr)
+ dbConn.commit()
+ dbConn.close()
+ # If failed - count the failure and add the error to list of errors.
+ except:
+ errorMsg = "Could not Update User"
+ raise Exception(errorMsg, "Update")
+
+ @staticmethod
+ def list_format(un_listed):
+ un_listed = un_listed[1:-1]
+ un_listed = un_listed.replace("',), ('", "|||")
+ un_listed = un_listed.replace("(u'", "") # Format list
+ un_listed = un_listed[1:-1].replace("('", "") # Format list
+ un_listed = un_listed.replace("',)", "") # Format list
+ listed = un_listed[1:-2].split("|||")
+ return listed
+
+ @staticmethod
+ def get_vendors_list():
+ # Select approved vendors from db.
+ vendors_list = DBGeneral.select_where(
+ "name", "ice_vendor", "public", "TRUE", 0)
+ return vendors_list