diff options
author | Edan Binshtok <eb578m@intl.att.com> | 2017-10-18 07:56:58 +0300 |
---|---|---|
committer | Edan Binshtok <eb578m@intl.att.com> | 2017-10-18 07:56:58 +0300 |
commit | 433a8256e31f755f5e236491bbe39d3db24d6d6d (patch) | |
tree | 45f483eab1ea1654ee21a3b51c5b8bf1a8ebaffa /services/helper.py | |
parent | f8907f0c4fc0ba4bb97a1d636a50c5b40c2642f2 (diff) |
Align CI test test and JJB
Add vendor agnostic CI test to align
Add Tox and maven docker
Issue Id: VVP-15
Change-Id: I69f0c1036e6f72b62bddc822544c55200af7b37d
Signed-off-by: Edan Binshtok <eb578m@intl.att.com>
Diffstat (limited to 'services/helper.py')
-rw-r--r-- | services/helper.py | 319 |
1 files changed, 164 insertions, 155 deletions
diff --git a/services/helper.py b/services/helper.py index 58769fe..418b260 100644 --- a/services/helper.py +++ b/services/helper.py @@ -1,5 +1,5 @@ -# ============LICENSE_START========================================== +# ============LICENSE_START========================================== # org.onap.vvp/test-engine # =================================================================== # Copyright © 2017 AT&T Intellectual Property. All rights reserved. @@ -54,157 +54,166 @@ from services.logging_service import LoggingServiceFactory from utils.authentication import JWTAuthentication -logger = LoggingServiceFactory.get_logger()
-
-class Helper:
-
- tc = unittest.TestCase('__init__')
-
- @staticmethod
- def rand_string(type_of_value='randomString', numberOfDigits=0):
- letters_and_numbers = string.ascii_letters + string.digits
- if type_of_value == 'email':
- myEmail = ''.join(random.choice(letters_and_numbers) for _ in range(
- 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com"
- return "ST" + myEmail
- elif type_of_value == 'randomNumber':
- randomNumber = ''.join("%s" % random.randint(2, 9)
- for _ in range(0, (numberOfDigits + 1)))
- return randomNumber
- elif type_of_value == 'randomString':
- randomString = "".join(random.sample(letters_and_numbers, 5))
- return "ST" + randomString
- else:
- raise Exception("invalid rand type")
-
- @staticmethod
- def rand_invite_email():
- inviteEmail = "automationqatt" + \
- Helper.rand_string("randomString") + "@gmail.com"
- return inviteEmail
-
- @staticmethod
- def generate_sshpub_key():
- try:
- logger.debug("About to generate SSH Public Key")
- key = rsa.generate_private_key(
- public_exponent=65537,
- key_size=2048,
- backend=default_backend()
- )
- private_key = key.private_bytes(
- encoding=crypto_serialization.Encoding.PEM,
- format=crypto_serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=crypto_serialization.NoEncryption())
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- logger.debug("Generated SSH Public Key: " + public_key)
- except Exception as e: # If failed write to log the error and return 'None'.
- logger.error(e)
- logger.error("Failed to generate SSH Public Key.")
- raise e
- return public_key
-
- @staticmethod
- def check_admin_ssh_existence(path, admin_ssh):
- if admin_ssh == open(path).read().rstrip('\n'):
- logger.debug(
- "Admin SSH already defined in DB and equal to the one stored on the local system.")
- return True
- return False
-
- @staticmethod
- def get_or_create_rsa_key_for_admin():
- try: # Create pair of keys for the given user and return his public key.
- ssh_folder = Constants.Paths.SSH.PATH
- public_file = ssh_folder + "id_rsa.pub"
- privateFile = ssh_folder + "id_rsa"
- admin_ssh_exist_and_equal = False
- admin_ssh = None
- if not os.path.exists(ssh_folder):
- os.makedirs(ssh_folder)
- elif os.path.exists(public_file):
- admin_ssh = DBUser.retrieve_admin_ssh_from_db()
- admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence(
- public_file, admin_ssh)
- # TODO find pending gitlab bug causing old ssh key not be updated in gitlab cache
- if False and admin_ssh_exist_and_equal:
- return admin_ssh
- else:
- logger.debug("Private key file: " + privateFile +
- "\nPublice key file: " + public_file)
- key = rsa.generate_private_key(
- backend=crypto_default_backend(),
- public_exponent=65537,
- key_size=2048
- )
- private_key = key.private_bytes(
- crypto_serialization.Encoding.PEM,
- crypto_serialization.PrivateFormat.PKCS8,
- crypto_serialization.NoEncryption()).decode("utf-8")
- public_key = key.public_key().public_bytes(
- crypto_serialization.Encoding.OpenSSH,
- crypto_serialization.PublicFormat.OpenSSH
- ).decode("utf-8")
-
- with open(privateFile, 'w') as content_file:
- os.chmod(privateFile, 0o600)
- content_file.write(private_key) # Save private key to file.
- logger.debug(
- "Private key created successfully for admin")
- user_pub_key = public_key
- with open(public_file, 'w') as content_file:
- content_file.write(public_key) # Save public key to file.
- logger.debug("Public key created successfully for admin")
- cmd = 'ssh-keyscan ' + \
- settings.GITLAB_URL[7:-1] + ' >> ' + \
- ssh_folder + 'known_hosts'
- # Create known_hosts file and add GitLab to it.
- subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE)
- logger.debug("Added GitLab to known_hosts")
- return user_pub_key
- except Exception as error:
- logger.error(
- "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error)
- raise Exception("Failed to create SSH keys for user admin", error)
-
- @staticmethod
- def internal_assert(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_assert_boolean(x, y):
- try:
- Helper.tc.assertEqual(str(x), str(y))
- return True
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def internal_not_equal(x, y):
- try:
- Helper.tc.assertNotEqual(x, y)
- except Exception as e:
- raise Exception("AssertionError: \"" + str(x) +
- "\" != \"" + str(y) + "\"", e)
-
- @staticmethod
- def get_reset_passw_url(email):
- jwtObj = JWTAuthentication()
- token = jwtObj.createPersonalTokenWithExpiration(email)
- resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token
- return resetPasswURL
-
- @staticmethod
- def assertTrue(expr, msg=None):
- """Check that the expression is true."""
- if expr != True:
- raise Exception("AssertionError: \"not expr")
+logger = LoggingServiceFactory.get_logger() + + +class Helper: + + tc = unittest.TestCase('__init__') + + @staticmethod + def rand_string(type_of_value='randomString', numberOfDigits=0): + letters_and_numbers = string.ascii_letters + string.digits + if type_of_value == 'email': + myEmail = ''.join(random.choice(letters_and_numbers) for _ in range( + 4)) + "@" + ''.join(random.choice(string.ascii_uppercase) for _ in range(4)) + ".com" + return "ST" + myEmail + elif type_of_value == 'randomNumber': + randomNumber = ''.join("%s" % random.randint(2, 9) + for _ in range(0, (numberOfDigits + 1))) + return randomNumber + elif type_of_value == 'randomString': + randomString = "".join(random.sample(letters_and_numbers, 5)) + return "ST" + randomString + else: + raise Exception("invalid rand type") + + @staticmethod + def rand_invite_email(): + inviteEmail = "automationqatt" + \ + Helper.rand_string("randomString") + "@gmail.com" + return inviteEmail + + @staticmethod + def generate_sshpub_key(): + try: + logger.debug("About to generate SSH Public Key") + key = rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend() + ) + private_key = key.private_bytes( + encoding=crypto_serialization.Encoding.PEM, + format=crypto_serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=crypto_serialization.NoEncryption()) + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + logger.debug("Generated SSH Public Key: " + public_key) + # If failed write to log the error and return 'None'. + except Exception as e: + logger.error(e) + logger.error("Failed to generate SSH Public Key.") + raise e + return public_key + + @staticmethod + def check_admin_ssh_existence(path, admin_ssh): + if admin_ssh == open(path).read().rstrip('\n'): + logger.debug( + "Admin SSH already defined in DB and equal to the one stored on the local system.") + return True + return False + + @staticmethod + def get_or_create_rsa_key_for_admin(): + # Create pair of keys for the given user and return his public key. + try: + ssh_folder = Constants.Paths.SSH.PATH + public_file = ssh_folder + "id_rsa.pub" + privateFile = ssh_folder + "id_rsa" + admin_ssh_exist_and_equal = False + admin_ssh = None + if not os.path.exists(ssh_folder): + os.makedirs(ssh_folder) + elif os.path.exists(public_file): + admin_ssh = DBUser.retrieve_admin_ssh_from_db() + admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence( + public_file, admin_ssh) + # TODO find pending gitlab bug causing old ssh key not be updated + # in gitlab cache + if False and admin_ssh_exist_and_equal: + return admin_ssh + else: + logger.debug("Private key file: " + privateFile + + "\nPublice key file: " + public_file) + key = rsa.generate_private_key( + backend=crypto_default_backend(), + public_exponent=65537, + key_size=2048 + ) + private_key = key.private_bytes( + crypto_serialization.Encoding.PEM, + crypto_serialization.PrivateFormat.PKCS8, + crypto_serialization.NoEncryption()).decode("utf-8") + public_key = key.public_key().public_bytes( + crypto_serialization.Encoding.OpenSSH, + crypto_serialization.PublicFormat.OpenSSH + ).decode("utf-8") + + with open(privateFile, 'w') as content_file: + os.chmod(privateFile, 0o600) + # Save private key to file. + content_file.write(private_key) + logger.debug( + "Private key created successfully for admin") + user_pub_key = public_key + with open(public_file, 'w') as content_file: + content_file.write(public_key) # Save public key to file. + logger.debug("Public key created successfully for admin") + cmd = 'ssh-keyscan ' + \ + settings.GITLAB_URL[7:-1] + ' >> ' + \ + ssh_folder + 'known_hosts' + # Create known_hosts file and add GitLab to it. + subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) + logger.debug("Added GitLab to known_hosts") + return user_pub_key + except Exception as error: + logger.error( + "_-_-_-_-_- Unexpected error in get_or_create_rsa_key_for_admin: %s" % error) + raise Exception("Failed to create SSH keys for user admin", error) + + @staticmethod + def internal_assert(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean(x, y): + try: + Helper.tc.assertEqual(str(x), str(y)) + return True + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def internal_assert_boolean_true_false(x, y): + return Helper.tc.assertEqual(str(x), str(y)) + + @staticmethod + def internal_not_equal(x, y): + try: + Helper.tc.assertNotEqual(x, y) + except Exception as e: + raise Exception("AssertionError: \"" + str(x) + + "\" != \"" + str(y) + "\"", e) + + @staticmethod + def get_reset_passw_url(email): + jwtObj = JWTAuthentication() + token = jwtObj.createPersonalTokenWithExpiration(email) + resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token + return resetPasswURL + + @staticmethod + def assertTrue(expr, msg=None): + """Check that the expression is true.""" + if expr != True: + raise Exception("AssertionError: \"not expr") |