diff options
Diffstat (limited to 'services/helper.py')
-rw-r--r-- | services/helper.py | 224 |
1 files changed, 0 insertions, 224 deletions
diff --git a/services/helper.py b/services/helper.py deleted file mode 100644 index dd779be..0000000 --- a/services/helper.py +++ /dev/null @@ -1,224 +0,0 @@ -# ============LICENSE_START========================================== -# org.onap.vvp/test-engine -# =================================================================== -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# =================================================================== -# -# Unless otherwise specified, all software contained herein is licensed -# under the Apache License, Version 2.0 (the “License”); -# you may not use this software except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# -# -# Unless otherwise specified, all documentation contained herein is licensed -# under the Creative Commons License, Attribution 4.0 Intl. (the “License”); -# you may not use this documentation except in compliance with the License. -# You may obtain a copy of the License at -# -# https://creativecommons.org/licenses/by/4.0/ -# -# Unless required by applicable law or agreed to in writing, documentation -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# ============LICENSE_END============================================ -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -import os -import random -import string -import subprocess -import unittest - -from cryptography.hazmat.backends import \ - default_backend as crypto_default_backend, default_backend -from cryptography.hazmat.primitives import \ - serialization as crypto_serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from django.conf import settings - -from services.constants import Constants -from services.database.db_user import DBUser -from services.logging_service import LoggingServiceFactory -from utils.authentication import JWTAuthentication - - -logger = LoggingServiceFactory.get_logger() - - -class Helper: - - tc = unittest.TestCase('__init__') - - @staticmethod - def rand_string(type_of_value='randomString', numberOfDigits=0): - letters_and_numbers = string.ascii_letters + string.digits - if type_of_value == 'email': - myEmail = ''.join( - random.choice(letters_and_numbers) for _ in range( - 4)) + "@" + ''.join( - random.choice( - string.ascii_uppercase) for _ in range(4)) + ".com" - return "ST" + myEmail - elif type_of_value == 'randomNumber': - randomNumber = ''.join("%s" % random.randint(2, 9) - for _ in range(0, (numberOfDigits + 1))) - return randomNumber - elif type_of_value == 'randomString': - randomString = "".join(random.sample(letters_and_numbers, 5)) - return "ST" + randomString - else: - raise Exception("invalid rand type") - - @staticmethod - def rand_invite_email(): - inviteEmail = "automationqatt" + \ - Helper.rand_string("randomString") + "@gmail.com" - return inviteEmail - - @staticmethod - def generate_sshpub_key(): - try: - logger.debug("About to generate SSH Public Key") - key = rsa.generate_private_key( - public_exponent=65537, - key_size=2048, - backend=default_backend() - ) - key.private_bytes( - encoding=crypto_serialization.Encoding.PEM, - format=crypto_serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=crypto_serialization.NoEncryption()) - public_key = key.public_key().public_bytes( - crypto_serialization.Encoding.OpenSSH, - crypto_serialization.PublicFormat.OpenSSH - ).decode("utf-8") - - logger.debug("Generated SSH Public Key: " + public_key) - # If failed write to log the error and return 'None'. - except Exception as e: - logger.error(e) - logger.error("Failed to generate SSH Public Key.") - raise e - return public_key - - @staticmethod - def check_admin_ssh_existence(path, admin_ssh): - if admin_ssh == open(path).read().rstrip('\n'): - logger.debug( - "Admin SSH already defined in DB and equal " + - "to the one stored on the local system.") - return True - return False - - @staticmethod - def get_or_create_rsa_key_for_admin(): - # Create pair of keys for the given user and return his public key. - try: - ssh_folder = Constants.Paths.SSH.PATH - public_file = ssh_folder + "id_rsa.pub" - privateFile = ssh_folder + "id_rsa" - admin_ssh_exist_and_equal = False - admin_ssh = None - if not os.path.exists(ssh_folder): - os.makedirs(ssh_folder) - elif os.path.exists(public_file): - admin_ssh = DBUser.retrieve_admin_ssh_from_db() - admin_ssh_exist_and_equal = Helper.check_admin_ssh_existence( - public_file, admin_ssh) - # TODO find pending gitlab bug causing old ssh key not be updated - # in gitlab cache - if False and admin_ssh_exist_and_equal: - return admin_ssh - else: - logger.debug("Private key file: " + privateFile + - "\nPublice key file: " + public_file) - key = rsa.generate_private_key( - backend=crypto_default_backend(), - public_exponent=65537, - key_size=2048 - ) - private_key = key.private_bytes( - crypto_serialization.Encoding.PEM, - crypto_serialization.PrivateFormat.PKCS8, - crypto_serialization.NoEncryption()).decode("utf-8") - public_key = key.public_key().public_bytes( - crypto_serialization.Encoding.OpenSSH, - crypto_serialization.PublicFormat.OpenSSH - ).decode("utf-8") - - with open(privateFile, 'w') as content_file: - os.chmod(privateFile, 0o600) - # Save private key to file. - content_file.write(private_key) - logger.debug( - "Private key created successfully for admin") - user_pub_key = public_key - with open(public_file, 'w') as content_file: - content_file.write(public_key) # Save public key to file. - logger.debug("Public key created successfully for admin") - cmd = 'ssh-keyscan ' + \ - settings.GITLAB_URL[7:-1] + ' >> ' + \ - ssh_folder + 'known_hosts' - # Create known_hosts file and add GitLab to it. - subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) - logger.debug("Added GitLab to known_hosts") - return user_pub_key - except Exception as error: - logger.error( - "_-_-_-_-_- Unexpected error in " + - "get_or_create_rsa_key_for_admin: %s" % error) - raise Exception("Failed to create SSH keys for user admin", error) - - @staticmethod - def internal_assert(x, y): - try: - Helper.tc.assertEqual(str(x), str(y)) - except Exception as e: - raise Exception("AssertionError: \"" + str(x) + - "\" != \"" + str(y) + "\"", e) - - @staticmethod - def internal_assert_boolean(x, y): - try: - Helper.tc.assertEqual(str(x), str(y)) - return True - except Exception as e: - raise Exception("AssertionError: \"" + str(x) + - "\" != \"" + str(y) + "\"", e) - - @staticmethod - def internal_assert_boolean_true_false(x, y): - return Helper.tc.assertEqual(str(x), str(y)) - - @staticmethod - def internal_not_equal(x, y): - try: - Helper.tc.assertNotEqual(x, y) - except Exception as e: - raise Exception("AssertionError: \"" + str(x) + - "\" != \"" + str(y) + "\"", e) - - @staticmethod - def get_reset_passw_url(email): - jwtObj = JWTAuthentication() - token = jwtObj.createPersonalTokenWithExpiration(email) - resetPasswURL = Constants.Default.LoginURL.TEXT + "?t=" + token - return resetPasswURL - - @staticmethod - def assertTrue(expr, msg=None): - """Check that the expression is true.""" - if not expr: - raise Exception("AssertionError: \"not expr") |