aboutsummaryrefslogtreecommitdiffstats
path: root/services/helper.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/helper.py')
-rw-r--r--services/helper.py319
1 files changed, 164 insertions, 155 deletions
diff --git a/services/helper.py b/services/helper.py
index 58769fe..418b260 100644
--- a/services/helper.py
+++ b/services/helper.py
@@ -1,5 +1,5 @@
-# ============LICENSE_START==========================================
+# ============LICENSE_START==========================================
# org.onap.vvp/test-engine
# ===================================================================
# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
@@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory
from utils.authentication import JWTAuthentication
-logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger()
+
+
+class Helper:
+
+ tc = unittest.TestCase('__init__')
+
+ @staticmethod
+ def rand_string(type_of_value='randomString', numberOfDigits=0):
+ letters_and_numbers = string.ascii_letters + string.digits
+ if type_of_value == 'email':
+ myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
+ 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
+ return "ST" + myEmail
+ elif type_of_value == 'randomNumber':
+ randomNumber = ''.join("%s" % random.randint(2, 9)
+ for _ in range(0, (numberOfDigits + 1)))
+ return randomNumber
+ elif type_of_value == 'randomString':
+ randomString = "".join(random.sample(letters_and_numbers, 5))
+ return "ST" + randomString
+ else:
+ raise Exception("invalid rand type")
+
+ @staticmethod
+ def rand_invite_email():
+ inviteEmail = "automationqatt" + \
+ Helper.rand_string("randomString") + "@gmail.com"
+ return inviteEmail
+
+ @staticmethod
+ def generate_sshpub_key():
+ try:
+ logger.debug("About to generate SSH Public Key")
+ key = rsa.generate_private_key(
+ public_exponent=65537,
+ key_size=2048,
+ backend=default_backend()
+ )
+ private_key = key.private_bytes(
+ encoding=crypto_serialization.Encoding.PEM,
+ format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=crypto_serialization.NoEncryption())
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ logger.debug("Generated SSH Public Key: " + public_key)
+ # If failed write to log the error and return 'None'.
+ except Exception as e:
+ logger.error(e)
+ logger.error("Failed to generate SSH Public Key.")
+ raise e
+ return public_key
+
+ @staticmethod
+ def check_admin_ssh_existence(path, admin_ssh):
+ if admin_ssh == open(path).read().rstrip('\n'):
+ logger.debug(
+ "Admin SSH already defined in DB and equal to the one stored on the local system.")
+ return True
+ return False
+
+ @staticmethod
+ def get_or_create_rsa_key_for_admin():
+ # Create pair of keys for the given user and return his public key.
+ try:
+ ssh_folder = Constants.Paths.SSH.PATH
+ public_file = ssh_folder + "id_rsa.pub"
+ privateFile = ssh_folder + "id_rsa"
+ admin_ssh_exist_and_equal = False
+ admin_ssh = None
+ if not os.path.exists(ssh_folder):
+ os.makedirs(ssh_folder)
+ elif os.path.exists(public_file):
+ admin_ssh = DBUser.retrieve_admin_ssh_from_db()
+ admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
+ public_file, admin_ssh)
+ # TODO find pending gitlab bug causing old ssh key not be updated
+ # in gitlab cache
+ if False and admin_ssh_exist_and_equal:
+ return admin_ssh
+ else:
+ logger.debug("Private key file: " + privateFile +
+ "\nPublice key file: " + public_file)
+ key = rsa.generate_private_key(
+ backend=crypto_default_backend(),
+ public_exponent=65537,
+ key_size=2048
+ )
+ private_key = key.private_bytes(
+ crypto_serialization.Encoding.PEM,
+ crypto_serialization.PrivateFormat.PKCS8,
+ crypto_serialization.NoEncryption()).decode("utf-8")
+ public_key = key.public_key().public_bytes(
+ crypto_serialization.Encoding.OpenSSH,
+ crypto_serialization.PublicFormat.OpenSSH
+ ).decode("utf-8")
+
+ with open(privateFile, 'w') as content_file:
+ os.chmod(privateFile, 0o600)
+ # Save private key to file.
+ content_file.write(private_key)
+ logger.debug(
+ "Private key created successfully for admin")
+ user_pub_key = public_key
+ with open(public_file, 'w') as content_file:
+ content_file.write(public_key) # Save public key to file.
+ logger.debug("Public key created successfully for admin")
+ cmd = 'ssh-keyscan ' + \
+ settings.GITLAB_URL[7:-1] + ' >> ' + \
+ ssh_folder + 'known_hosts'
+ # Create known_hosts file and add GitLab to it.
+ subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
+ logger.debug("Added GitLab to known_hosts")
+ return user_pub_key
+ except Exception as error:
+ logger.error(
+ "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
+ raise Exception("Failed to create SSH keys for user admin", error)
+
+ @staticmethod
+ def internal_assert(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean(x, y):
+ try:
+ Helper.tc.assertEqual(str(x), str(y))
+ return True
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def internal_assert_boolean_true_false(x, y):
+ return Helper.tc.assertEqual(str(x), str(y))
+
+ @staticmethod
+ def internal_not_equal(x, y):
+ try:
+ Helper.tc.assertNotEqual(x, y)
+ except Exception as e:
+ raise Exception("AssertionError: \"" + str(x) +
+ "\" != \"" + str(y) + "\"", e)
+
+ @staticmethod
+ def get_reset_passw_url(email):
+ jwtObj = JWTAuthentication()
+ token = jwtObj.createPersonalTokenWithExpiration(email)
+ resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
+ return resetPasswURL
+
+ @staticmethod
+ def assertTrue(expr, msg=None):
+ """Check that the expression is true."""
+ if expr != True:
+ raise Exception("AssertionError: \"not expr")