diff options
-rw-r--r-- | pgaas/pgaas/__init__.py | 3 | ||||
-rw-r--r-- | pgaas/pgaas/logginginterface.py | 10 | ||||
-rw-r--r-- | pgaas/pgaas/pgaas_plugin.py | 475 | ||||
-rw-r--r-- | pgaas/pgaas_types.yaml | 5 | ||||
-rw-r--r-- | pgaas/pom.xml | 11 | ||||
-rw-r--r-- | pgaas/tests/psycopg2.py | 39 | ||||
-rw-r--r-- | pgaas/tests/test_plugin.py | 170 | ||||
-rw-r--r-- | pgaas/tox.ini | 11 |
8 files changed, 536 insertions, 188 deletions
diff --git a/pgaas/pgaas/__init__.py b/pgaas/pgaas/__init__.py index e3c966c..4f8c969 100644 --- a/pgaas/pgaas/__init__.py +++ b/pgaas/pgaas/__init__.py @@ -1,3 +1,6 @@ +""" +PostgreSQL plugin to manage passwords +""" import logging def get_module_logger(mod_name): diff --git a/pgaas/pgaas/logginginterface.py b/pgaas/pgaas/logginginterface.py index 82fe4f2..cd2a774 100644 --- a/pgaas/pgaas/logginginterface.py +++ b/pgaas/pgaas/logginginterface.py @@ -6,9 +6,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,8 +16,14 @@ # limitations under the License. # ============LICENSE_END====================================================== +""" +PostgreSQL plugin to manage passwords +""" + from cloudify import ctx +# pragma pylint: disable=bad-indentation + def debug(msg): """ Print a debugging message. diff --git a/pgaas/pgaas/pgaas_plugin.py b/pgaas/pgaas/pgaas_plugin.py index a5e4440..73f7db6 100644 --- a/pgaas/pgaas/pgaas_plugin.py +++ b/pgaas/pgaas/pgaas_plugin.py @@ -6,9 +6,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,30 +16,49 @@ # limitations under the License. # ============LICENSE_END====================================================== +""" +PostgreSQL plugin to manage passwords +""" + from __future__ import print_function +import sys +USING_PYTHON2 = sys.version_info[0] < 3 +# pylint: disable=wrong-import-position +# pylint: disable=wrong-import-order +# pylint: disable=import-error from cloudify import ctx from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError from cloudify.exceptions import RecoverableError -from logginginterface import * +# pylint: disable=wildcard-import +if USING_PYTHON2: + from logginginterface import debug, info, warn, error +else: + from .logginginterface import debug, info, warn, error import os import re import json import hashlib import socket -import sys import traceback import base64 -import urllib +if USING_PYTHON2: + import urllib +else: + import urllib.request + import urllib.parse + import urllib.error +import collections + -opath = sys.path -sys.path = list(opath) +SYSPATH = sys.path +sys.path = list(SYSPATH) sys.path.append('/usr/lib64/python2.7/site-packages') import psycopg2 -sys.path = opath +sys.path = SYSPATH """ To set up a cluster: @@ -65,7 +84,9 @@ sys.path = opath type: dcae.nodes.pgaas.cluster properties: writerfqdn: { get_input: k8s_pgaas_instance_fqdn } - # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } + # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', + # { get_input: pgaas_cluster_name }, '-write.', + # { get_input: location_domain } ] } # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } use_existing: true @@ -78,8 +99,12 @@ sys.path = opath writerfqdn: { get_input: k8s_pgaas_instance_fqdn } readerfqdn: { get_input: k8s_pgaas_instance_fqdn } # OR: - # writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } - # readerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '.', { get_input: location_domain } ] } + # writerfqdn: { concat: [ { get_input: location_prefix }, '-', + # { get_input: pgaas_cluster_name }, '-write.', + # { get_input: location_domain } ] } + # readerfqdn: { concat: [ { get_input: location_prefix }, '-', + # { get_input: pgaas_cluster_name }, '.', + # { get_input: location_domain } ] } initialpassword: { get_input: currentpassword } relationships: - type: dcae.relationships.pgaas_cluster_uses_sshkeypair @@ -92,21 +117,25 @@ sys.path = opath To set up a database: - http://$NEXUS/raw/type_files/pgaas_types.yaml - pgaasdbtest: - type: dcae.nodes.pgaas.database - properties: + pgaasdbtest: + type: dcae.nodes.pgaas.database + properties: writerfqdn: { get_input: k8s_pgaas_instance_fqdn } - # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } + # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', + # { get_input: pgaas_cluster_name }, '-write.', + # { get_input: location_domain } ] } # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } name: { get_input: database_name } To reference an existing database: - http://$NEXUS/raw/type_files/pgaas_types.yaml - $CLUSTER_$DBNAME: - type: dcae.nodes.pgaas.database - properties: + $CLUSTER_$DBNAME: + type: dcae.nodes.pgaas.database + properties: writerfqdn: { get_input: k8s_pgaas_instance_fqdn } - # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } + # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-', + # { get_input: pgaas_cluster_name }, '-write.', + # { get_input: location_domain } ] } # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } name: { get_input: database_name } use_existing: true @@ -118,7 +147,7 @@ sys.path = opath description: Admin Username for $CLUSTER $DBNAME database value: { get_attribute: [ $CLUSTER_$DBNAME, admin, user ] } $CLUSTER_$DBNAME_admin_password: - description: Admin Password for $CLUSTER $DBNAME database + description: Admin Password for $CLUSTER $DBNAME database value: { get_attribute: [ $CLUSTER_$DBNAME, admin, password ] } $CLUSTER_$DBNAME_user_host: description: Hostname for $CLUSTER $DBNAME database @@ -141,14 +170,23 @@ sys.path = opath """ -OPT_MANAGER_RESOURCES = "/opt/manager/resources" +OPT_MANAGER_RESOURCES_PGAAS = "/opt/manager/resources/pgaas" -def setOptManagerResources(o): - global OPT_MANAGER_RESOURCES - OPT_MANAGER_RESOURCES = o +# pylint: disable=invalid-name +def setOptManagerResources(o): # pylint: disable=global-statement + """ + Overrides the default locations of /opt/managers/resources + """ + # pylint: disable=global-statement + global OPT_MANAGER_RESOURCES_PGAAS + OPT_MANAGER_RESOURCES_PGAAS = "{}/pgaas".format(o) def safestr(s): - return urllib.quote(str(s), '') + """ + returns a safely printable version of the string + """ + # pylint: disable=no-member + return urllib.quote(str(s), '') if USING_PYTHON2 else urllib.parse.quote(str(s), '') def raiseRecoverableError(msg): """ @@ -167,18 +205,33 @@ def raiseNonRecoverableError(msg): raise NonRecoverableError(msg) def dbexecute(crx, cmd, args=None): + """ + executes the SQL statement + Prints the entire command for debugging purposes + """ debug("executing {}".format(cmd)) crx.execute(cmd, args) + +def dbexecute_trunc_print(crx, cmd, args=None): + """ + executes the SQL statement. + Will print only the first 30 characters in the command + Use this function if you are executing an SQL cmd with a password + """ + debug("executing {}".format(cmd[:30])) + crx.execute(cmd, args) + + def waithp(host, port): """ do a test connection to a host and port """ - debug("waithp({0},{1})".format(safestr(host),safestr(port))) + debug("waithp({0},{1})".format(safestr(host), safestr(port))) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((host, int(port))) - except: + except: # pylint: disable=bare-except a, b, c = sys.exc_info() traceback.print_exception(a, b, c) sock.close() @@ -189,8 +242,8 @@ def doconn(desc): """ open an SQL connection to the PG server """ - debug("doconn({},{},{})".format(desc['host'], desc['user'],desc['database'])) - # debug("doconn({},{},{},{})".format(desc['host'], desc['user'],desc['database'],desc['password'])) + debug("doconn({},{},{})".format(desc['host'], desc['user'], desc['database'])) + # debug("doconn({},{},{},{})".format(desc['host'], desc['user'], desc['database'], desc['password'])) ret = psycopg2.connect(**desc) ret.autocommit = True return ret @@ -225,6 +278,7 @@ def rootdesc(data, dbname, initialpassword=None): return the postgres connection information """ debug("rootdesc(..data..,{0})".format(safestr(dbname))) + # pylint: disable=bad-continuation return { 'database': dbname, 'host': hostportion(data['rw']), @@ -239,14 +293,14 @@ def rootconn(data, dbname='postgres', initialpassword=None): connecting to the specified database """ debug("rootconn(..data..,{0})".format(safestr(dbname))) - ret = doconn(rootdesc(data, dbname, initialpassword)) - return ret + return doconn(rootdesc(data, dbname, initialpassword)) def onedesc(data, dbname, role, access): """ return the connection information for a given user and dbname on a cluster """ user = '{0}_{1}'.format(dbname, role) + # pylint: disable=bad-continuation return { 'database': dbname, 'host': hostportion(data[access]), @@ -259,6 +313,7 @@ def dbdescs(data, dbname): """ return the entire set of information for a specific server/database """ + # pylint: disable=bad-continuation return { 'admin': onedesc(data, dbname, 'admin', 'rw'), 'user': onedesc(data, dbname, 'user', 'rw'), @@ -275,20 +330,20 @@ def getpass(data, ident, hostport, dbname): # mix in the seed (the last line) for that database, if one exists hostport = hostport.lower() dbname = dbname.lower() - hostPortDbname = '{0}/pgaas/{1}:{2}'.format(OPT_MANAGER_RESOURCES, hostport, dbname) + hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport, dbname) try: lastLine = '' with open(hostPortDbname, "r") as fp: for line in fp: - lastLine = line.strip() - m.update(line) + lastLine = line + m.update(lastLine) except IOError: pass m.update(base64.b64decode(data['data'])) return m.hexdigest() -def find_related_nodes(reltype, inst = None): +def find_related_nodes(reltype, inst=None): """ extract the related_nodes information from the context for a specific relationship @@ -305,6 +360,8 @@ def chkfqdn(fqdn): """ verify that a FQDN is valid """ + if fqdn is None: + return False hp = hostportion(fqdn) # not needed right now: pp = portportion(fqdn) # TODO need to augment this for IPv6 addresses @@ -315,62 +372,104 @@ def chkdbname(dbname): verify that a database name is valid """ ret = re.match('[a-zA-Z][a-zA-Z0-9]{0,43}', dbname) is not None and dbname != 'postgres' - if not ret: warn("Invalid dbname: {0}".format(safestr(dbname))) + if not ret: + warn("Invalid dbname: {0}".format(safestr(dbname))) return ret +def get_valid_domains(): + """ + Return a list of the valid names, suitable for inclusion in an error message. + """ + msg = '' + import glob + validDomains = [] + for f in glob.glob('{}/*'.format(OPT_MANAGER_RESOURCES_PGAAS)): + try: + with open(f, "r") as fp: + try: + tmpdata = json.load(fp) + if 'pubkey' in tmpdata: + validDomains.append(os.path.basename(f)) + except: # pylint: disable=bare-except + pass + except: # pylint: disable=bare-except + pass + if len(validDomains) == 0: + msg += '\nNo valid PostgreSQL cluster information was found' + else: + msg += '\nThese are the valid PostgreSQL cluster domains found on this manager:' + for v in validDomains: + msg += '\n\t"{}"'.format(v) + return msg + +def get_existing_clusterinfo(wfqdn, rfqdn, related): + """ + Retrieve all of the information specific to an existing cluster. + """ + if rfqdn != '': + raiseNonRecoverableError('Read-only FQDN must not be specified when using an existing cluster, fqdn={0}'.format(safestr(rfqdn))) + if len(related) != 0: + raiseNonRecoverableError('Cluster SSH keypair must not be specified when using an existing cluster') + try: + fn = '{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()) + with open(fn, 'r') as f: + data = json.load(f) + data['rw'] = wfqdn + return data + except Exception as e: # pylint: disable=broad-except + warn("Error: {0}".format(e)) + msg = 'Cluster must be deployed when using an existing cluster.\nCheck your domain name: fqdn={0}\nerr={1}'.format(safestr(wfqdn), e) + if not os.path.isdir(OPT_MANAGER_RESOURCES_PGAAS): + msg += '\nThe directory {} does not exist. No PostgreSQL clusters have been deployed on this manager.'.format(OPT_MANAGER_RESOURCES_PGAAS) + else: + msg += get_valid_domains() + # warn("Stack: {0}".format(traceback.format_exc())) + raiseNonRecoverableError(msg) + def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related): """ Retrieve all of the information specific to a cluster. if reuse, retrieve it else create and store it """ - debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword))) + # debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword))) + debug("getclusterinfo({}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn))) if not chkfqdn(wfqdn): raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) if reuse: - if rfqdn != '': - raiseNonRecoverableError('Read-only FQDN must not be specified when using an existing cluster, fqdn={0}'.format(safestr(rfqdn))) - if len(related) != 0: - raiseNonRecoverableError('Cluster SSH keypair must not be specified when using an existing cluster') - try: - fn = '{0}/pgaas/{1}'.format(OPT_MANAGER_RESOURCES, wfqdn.lower()) - with open(fn, 'r') as f: - data = json.load(f) - data['rw'] = wfqdn - return data - except Exception as e: - warn("Error: {0}".format(e)) - warn("Stack: {0}".format(traceback.format_exc())) - raiseNonRecoverableError('Cluster must be deployed when using an existing cluster. Check your domain name: fqdn={0}, err={1}'.format(safestr(wfqdn),e)) + return get_existing_clusterinfo(wfqdn, rfqdn, related) + if rfqdn == '': rfqdn = wfqdn elif not chkfqdn(rfqdn): raiseNonRecoverableError('Invalid FQDN specified for read-only access, fqdn={0}'.format(safestr(rfqdn))) if len(related) != 1: - raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair relationship to a dcae.nodes.sshkeypair node') - data = { 'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'], 'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256' } - os.umask(077) + raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair ' + + 'relationship to a dcae.nodes.sshkeypair node') + data = {'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'], + 'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256'} + os.umask(0o77) try: - os.makedirs('{0}/pgaas'.format(OPT_MANAGER_RESOURCES)) - except: + os.makedirs('{0}'.format(OPT_MANAGER_RESOURCES_PGAAS)) + except: # pylint: disable=bare-except pass try: - with open('{0}/pgaas/{1}'.format(OPT_MANAGER_RESOURCES, wfqdn.lower()), 'w') as f: + with open('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()), 'w') as f: f.write(json.dumps(data)) - except Exception as e: + except Exception as e: # pylint: disable=broad-except warn("Error: {0}".format(e)) warn("Stack: {0}".format(traceback.format_exc())) - raiseNonRecoverableError('Cannot write cluster information to {0}/pgaas: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES, safestr(wfqdn), e)) + raiseNonRecoverableError('Cannot write cluster information to {0}: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES_PGAAS, safestr(wfqdn), e)) data['rw'] = wfqdn if initialpassword: with rootconn(data, initialpassword=initialpassword) as conn: crr = conn.cursor() - dbexecute(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres', wfqdn, 'postgres'),)) + dbexecute_trunc_print(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres', wfqdn, 'postgres'),)) crr.close() - return(data) + return data @operation -def add_pgaas_cluster(**kwargs): +def add_pgaas_cluster(**kwargs): # pylint: disable=unused-argument """ dcae.nodes.pgaas.cluster: Record key generation data for cluster @@ -384,15 +483,15 @@ def add_pgaas_cluster(**kwargs): find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair')) ctx.instance.runtime_properties['public'] = data['pubkey'] ctx.instance.runtime_properties['base64private'] = data['data'] - # not needed right now: ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres', ctx.node.properties['writerfqdn'], 'postgres') + ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres', ctx.node.properties['writerfqdn'], 'postgres') warn('All done') - except Exception as e: + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e @operation -def rm_pgaas_cluster(**kwargs): +def rm_pgaas_cluster(**kwargs): # pylint: disable=unused-argument """ dcae.nodes.pgaas.cluster: Remove key generation data for cluster @@ -401,9 +500,9 @@ def rm_pgaas_cluster(**kwargs): warn("rm_pgaas_cluster()") wfqdn = ctx.node.properties['writerfqdn'] if chkfqdn(wfqdn) and not ctx.node.properties['use_existing']: - os.remove('{0}/pgaas/{1}'.format(OPT_MANAGER_RESOURCES, wfqdn)) + os.remove('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn)) warn('All done') - except Exception as e: + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e @@ -417,14 +516,23 @@ def dbgetinfo(refctx): related = find_related_nodes('dcae.relationships.database_runson_pgaas_cluster', refctx.instance) if wfqdn == '': if len(related) != 1: - raiseNonRecoverableError('Database Cluster must be specified using exactly one dcae.relationships.database_runson_pgaas_cluster relationship to a dcae.nodes.pgaas.cluster node when writerfqdn is not specified') + raiseNonRecoverableError('Database Cluster must be specified using exactly one dcae.relationships.database_runson_pgaas_cluster relationship ' + + 'to a dcae.nodes.pgaas.cluster node when writerfqdn is not specified') wfqdn = related[0].node.properties['writerfqdn'] + return dbgetinfo_for_update(wfqdn) + +def dbgetinfo_for_update(wfqdn): + """ + Get the data associated with a database. + Make sure the connection exists. + """ + if not chkfqdn(wfqdn): raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) ret = getclusterinfo(wfqdn, True, '', '', []) waithp(hostportion(wfqdn), portportion(wfqdn)) return ret - + @operation def create_database(**kwargs): """ @@ -438,22 +546,22 @@ def create_database(**kwargs): if not chkdbname(dbname): raiseNonRecoverableError('Unacceptable or missing database name: {0}'.format(safestr(dbname))) debug('create_database(): dbname checked out') - info = dbgetinfo(ctx) + dbinfo = dbgetinfo(ctx) debug('Got db server info') - descs = dbdescs(info, dbname) + descs = dbdescs(dbinfo, dbname) ctx.instance.runtime_properties['admin'] = descs['admin'] ctx.instance.runtime_properties['user'] = descs['user'] ctx.instance.runtime_properties['viewer'] = descs['viewer'] - with rootconn(info) as conn: + with rootconn(dbinfo) as conn: crx = conn.cursor() - dbexecute(crx,'SELECT datname FROM pg_database WHERE datistemplate = false') - existingdbs = [ x[0] for x in crx ] + dbexecute(crx, 'SELECT datname FROM pg_database WHERE datistemplate = false') + existingdbs = [x[0] for x in crx] if ctx.node.properties['use_existing']: if dbname not in existingdbs: raiseNonRecoverableError('use_existing specified but database does not exist, dbname={0}'.format(safestr(dbname))) return - dbexecute(crx,'SELECT rolname FROM pg_roles') - existingroles = [ x[0] for x in crx ] + dbexecute(crx, 'SELECT rolname FROM pg_roles') + existingroles = [x[0] for x in crx] admu = descs['admin']['user'] usru = descs['user']['user'] vwru = descs['viewer']['user'] @@ -461,45 +569,45 @@ def create_database(**kwargs): cvwr = '{0}_common_viewer_role'.format(dbname) schm = '{0}_db_common'.format(dbname) if admu not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],)) + dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],)) if usru not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],)) + dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],)) if vwru not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],)) + dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],)) if cusr not in existingroles: - dbexecute(crx,'CREATE ROLE {0}'.format(cusr)) + dbexecute(crx, 'CREATE ROLE {0}'.format(cusr)) if cvwr not in existingroles: - dbexecute(crx,'CREATE ROLE {0}'.format(cvwr)) + dbexecute(crx, 'CREATE ROLE {0}'.format(cvwr)) if dbname not in existingdbs: - dbexecute(crx,'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu)) + dbexecute(crx, 'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu)) crx.close() - with rootconn(info, dbname) as dbconn: + with rootconn(dbinfo, dbname) as dbconn: crz = dbconn.cursor() - for r in [ cusr, cvwr, usru, vwru ]: - dbexecute(crz,'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r)) - dbexecute(crz,'GRANT {0} TO {1}'.format(cvwr, cusr)) - dbexecute(crz,'GRANT {0} TO {1}'.format(cusr, admu)) - dbexecute(crz,'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr)) - dbexecute(crz,'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu)) - for r in [ admu, cusr, cvwr, usru, vwru ]: - dbexecute(crz,'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm)) - dbexecute(crz,'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr)) - dbexecute(crz,'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr)) - dbexecute(crz,'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr)) - dbexecute(crz,'GRANT {0} to {1}'.format(cusr, usru)) - dbexecute(crz,'GRANT {0} to {1}'.format(cvwr, vwru)) + for r in [cusr, cvwr, usru, vwru]: + dbexecute(crz, 'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r)) + dbexecute(crz, 'GRANT {0} TO {1}'.format(cvwr, cusr)) + dbexecute(crz, 'GRANT {0} TO {1}'.format(cusr, admu)) + dbexecute(crz, 'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr)) + dbexecute(crz, 'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu)) + for r in [admu, cusr, cvwr, usru, vwru]: + dbexecute(crz, 'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm)) + dbexecute(crz, 'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr)) + dbexecute(crz, 'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu)) + dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr)) + dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr)) + dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr)) + dbexecute(crz, 'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr)) + dbexecute(crz, 'GRANT {0} to {1}'.format(cusr, usru)) + dbexecute(crz, 'GRANT {0} to {1}'.format(cvwr, vwru)) crz.close() warn('All done') - except Exception as e: + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e @operation -def delete_database(**kwargs): +def delete_database(**kwargs): # pylint: disable=unused-argument """ dcae.nodes.pgaas.database: Delete a database from a cluster @@ -514,73 +622,164 @@ def delete_database(**kwargs): if ctx.node.properties['use_existing']: return debug('delete_database(): !use_existing') - info = dbgetinfo(ctx) + dbinfo = dbgetinfo(ctx) debug('Got db server info') - with rootconn(info) as conn: + with rootconn(dbinfo) as conn: crx = conn.cursor() admu = ctx.instance.runtime_properties['admin']['user'] usru = ctx.instance.runtime_properties['user']['user'] vwru = ctx.instance.runtime_properties['viewer']['user'] cusr = '{0}_common_user_role'.format(dbname) cvwr = '{0}_common_viewer_role'.format(dbname) - dbexecute(crx,'DROP DATABASE IF EXISTS {0}'.format(dbname)) - for r in [ usru, vwru, admu, cusr, cvwr ]: - dbexecute(crx,'DROP ROLE IF EXISTS {0}'.format(r)) + dbexecute(crx, 'DROP DATABASE IF EXISTS {0}'.format(dbname)) + for r in [usru, vwru, admu, cusr, cvwr]: + dbexecute(crx, 'DROP ROLE IF EXISTS {0}'.format(r)) warn('All gone') - except Exception as e: + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e +############################################################# +# function: update_database # +# Purpose: Called as a workflow to change the database # +# passwords for all the users # +# # +# Invoked via: # +# cfy executions start -d <deployment-id> update_db_passwd # +# # +# Assumptions: # +# 1) pgaas_types.yaml must define a work flow e.g. # +# workflows: # +# update_db_passwd : # +# mapping : pgaas.pgaas.pgaas_plugin.update_database # +# 2) DB Blueprint: node_template must have properties: # +# writerfqdn & name (of DB) # +############################################################# +# pylint: disable=unused-argument @operation -def update_database(**kwargs): +def update_database(refctx, **kwargs): """ dcae.nodes.pgaas.database: Update the password for a database from a cluster + refctx is auto injected into the function when called as a workflow """ try: debug("update_database() invoked") - dbname = ctx.node.properties['name'] - warn("update_database({0})".format(safestr(dbname))) + + ################################################ + # Verify refctx contains the <nodes> attribute. # + # The workflow context might not be consistent # + # across different cloudify versions # + ################################################ + if not hasattr(refctx, 'nodes'): + raiseNonRecoverableError('workflow context does not contain attribute=<nodes>. dir(refctx)={}'.format(dir(refctx))) + + ############################################ + # Verify that refctx.nodes is iterable # + ############################################ + if not isinstance(refctx.nodes, collections.Iterable): + raiseNonRecoverableError("refctx.nodes is not an iterable. Type={}".format(type(refctx.nodes))) + + ctx_node = None + ############################################## + # Iterate through the nodes until we find # + # one with the properties we are looking for # + ############################################## + for i in refctx.nodes: + + ############################################ + # Safeguard: If a given node doesn't have # + # properties then skip it. # + # Don't cause an exception since the nodes # + # entry we are searching might still exist # + ############################################ + if not hasattr(i, 'properties'): + warn('Encountered a ctx node that does not have attr=<properties>. dir={}'.format(dir(i))) + continue + + debug("ctx node has the following Properties: {}".format(list(i.properties.keys()))) + + if ('name' in i.properties) and ('writerfqdn' in i.properties): + ctx_node = i + break + + + ############################################### + # If none of the nodes have properties: # + # <name> and <writerfqdn> then fatal error # + ############################################### + if not ctx_node: + raiseNonRecoverableError('Either <name> or <writerfqdn> is not found in refctx.nodes.properties.') + + debug("name is {}".format(ctx_node.properties['name'])) + debug("host is {}".format(ctx_node.properties['writerfqdn'])) + + dbname = ctx_node.properties['name'] + debug("update_database({0})".format(safestr(dbname))) + + ########################### + # dbname must be valid # + ########################### if not chkdbname(dbname): - return - debug('update_database(): dbname checked out') - if ctx.node.properties['use_existing']: - return - debug('update_database(): !use_existing') - hostport = ctx.node.properties['writerfqdn'] + raiseNonRecoverableError('dbname is null') + + + hostport = ctx_node.properties['writerfqdn'] debug('update_database(): wfqdn={}'.format(hostport)) - info = dbgetinfo(ctx) - debug('Got db server info') - hostPortDbname = '{0}/pgaas/{1}:{2}'.format(OPT_MANAGER_RESOURCES, hostport.lower(), dbname.lower()) + dbinfo = dbgetinfo_for_update(hostport) + + #debug('Got db server info={}'.format(dbinfo)) + + hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport.lower(), dbname.lower()) + debug('update_database(): hostPortDbname={}'.format(hostPortDbname)) try: - appended = False - with open(hostPortDbname, "a") as fp: - with open("/dev/urandom", "rb") as rp: - b = rp.read(16) - import binascii - print(binascii.hexlify(b).decode('utf-8'), file=fp) - appended = True - if not appended: - ctx.logger.warn("Error: the password for {} {} was not successfully changed".format(hostport, dbname)) - except Exception as e: + appended = False + with open(hostPortDbname, "a") as fp: + with open("/dev/urandom", "rb") as rp: + b = rp.read(16) + import binascii + print(binascii.hexlify(b).decode('utf-8'), file=fp) + appended = True + if not appended: + ctx.logger.warn("Error: the password for {} {} was not successfully changed".format(hostport, dbname)) + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e - with rootconn(info) as conn: + descs = dbdescs(dbinfo, dbname) + + ########################################## + # Verify we have expected keys # + # <admin>, <user>, and <viewer> as well # + # as "sub-key" <user> # + ########################################## + + if not isinstance(descs, dict): + raiseNonRecoverableError('db descs has unexpected type=<{}> was expected type dict'.format(type(descs))) + + for key in ("admin", "user", "viewer"): + if key not in descs: + raiseNonRecoverableError('db descs does not contain key=<{}>. Keys found for descs are: {}'.format(key, list(descs.keys()))) + if 'user' not in descs[key]: + raiseNonRecoverableError('db descs[{}] does not contain key=<user>. Keys found for descs[{}] are: {}'.format(key, key, list(descs[key].keys()))) + + + with rootconn(dbinfo) as conn: crx = conn.cursor() - admu = ctx.instance.runtime_properties['admin']['user'] - usru = ctx.instance.runtime_properties['user']['user'] - vwru = ctx.instance.runtime_properties['viewer']['user'] - cusr = '{0}_common_user_role'.format(dbname) - cvwr = '{0}_common_viewer_role'.format(dbname) - for r in [ usru, vwru, admu ]: - dbexecute(crx,"ALTER USER {} WITH PASSWORD '{}'".format(r, getpass(info, r, hostport, dbname))) + + admu = descs['admin']['user'] + usru = descs['user']['user'] + vwru = descs['viewer']['user'] + + for r in [usru, vwru, admu]: + dbexecute_trunc_print(crx, "ALTER USER {} WITH PASSWORD '{}'".format(r, getpass(dbinfo, r, hostport, dbname))) + #debug("user={} password={}".format(r, getpass(dbinfo, r, hostport, dbname))) warn('All users updated for database {}'.format(dbname)) - except Exception as e: + except Exception as e: # pylint: disable=broad-except ctx.logger.warn("Error: {0}".format(e)) ctx.logger.warn("Stack: {0}".format(traceback.format_exc())) raise e diff --git a/pgaas/pgaas_types.yaml b/pgaas/pgaas_types.yaml index 2118231..60a8fa7 100644 --- a/pgaas/pgaas_types.yaml +++ b/pgaas/pgaas_types.yaml @@ -53,7 +53,6 @@ node_types: cloudify.interfaces.lifecycle: create: pgaas.pgaas.pgaas_plugin.create_database delete: pgaas.pgaas.pgaas_plugin.delete_database - update: pgaas.pgaas.pgaas_plugin.update_database relationships: dcae.relationships.pgaas_cluster_uses_sshkeypair: @@ -62,3 +61,7 @@ relationships: derived_from: cloudify.relationships.contained_in dcae.relationships.application_uses_pgaas_database: derived_from: cloudify.relationships.connected_to + +workflows: + update_db_passwd : + mapping : pgaas.pgaas.pgaas_plugin.update_database diff --git a/pgaas/pom.xml b/pgaas/pom.xml index 66e6d5c..1c8eb5c 100644 --- a/pgaas/pom.xml +++ b/pgaas/pom.xml @@ -72,16 +72,6 @@ limitations under the License. <skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo> </configuration> </plugin> - - <!-- maven-deploy-plugin is called during deploy but we do not need it --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <version>2.8</version> - <configuration> - <skip>true</skip> - </configuration> - </plugin> </plugins> </pluginManagement> @@ -148,6 +138,7 @@ limitations under the License. <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> + <version>1.2.1</version> <executions> <execution> <id>clean phase script</id> diff --git a/pgaas/tests/psycopg2.py b/pgaas/tests/psycopg2.py index fab5333..07b71f4 100644 --- a/pgaas/tests/psycopg2.py +++ b/pgaas/tests/psycopg2.py @@ -22,32 +22,49 @@ This is a mock psycopg2 module. """ -class csr(object): +class MockCursor(object): + """ + mocked cursor + """ def __init__(self, **kwargs): pass - def execute(self, cmd, exc = None): + def execute(self, cmd, exc=None): + """ + mock SQL execution + """ pass - + def close(self): + """ + mock SQL close + """ pass def __iter__(self): return iter([]) - -class conn(object): + +class MockConn(object): # pylint: disable=too-few-public-methods + """ + mock SQL connection + """ def __init__(self, **kwargs): pass def __enter__(self): return self - + def __exit__(self, exc_type, exc_value, traceback): pass - def cursor(self): - return csr() - -def connect(**kwargs): - return conn() + def cursor(self): # pylint: disable=no-self-use + """ + mock return a cursor + """ + return MockCursor() +def connect(**kwargs): # pylint: disable=unused-argument + """ + mock get-a-connection + """ + return MockConn() diff --git a/pgaas/tests/test_plugin.py b/pgaas/tests/test_plugin.py index d3c29b9..0071499 100644 --- a/pgaas/tests/test_plugin.py +++ b/pgaas/tests/test_plugin.py @@ -16,6 +16,12 @@ # limitations under the License. # ============LICENSE_END====================================================== +""" +unit tests for PostgreSQL password plugin +""" + +from __future__ import print_function +# pylint: disable=import-error,unused-import,wrong-import-order import pytest import socket import psycopg2 @@ -28,70 +34,111 @@ from cloudify.state import current_ctx from cloudify.exceptions import NonRecoverableError from cloudify import ctx -import sys, os +import sys +import os sys.path.append(os.path.realpath(os.path.dirname(__file__))) +import traceback -TMPNAME = "/tmp/pgaas_plugin_tests" +TMPNAME = "/tmp/pgaas_plugin_tests_{}".format(os.environ["USER"] if "USER" in os.environ else + os.environ["LOGNAME"] if "LOGNAME" in os.environ else + str(os.getuid())) class MockKeyPair(object): + """ + mock keypair for cloudify contexts + """ def __init__(self, type_hierarchy=None, target=None): self._type_hierarchy = type_hierarchy self._target = target @property def type_hierarchy(self): + """ + return the type hierarchy + """ return self._type_hierarchy @property def target(self): + """ + return the target + """ return self._target -class MockInstance(object): +class MockInstance(object): # pylint: disable=too-few-public-methods + """ + mock instance for cloudify contexts + """ def __init__(self, instance=None): self._instance = instance @property def instance(self): + """ + return the instance + """ return self._instance -class MockRuntimeProperties(object): +class MockRuntimeProperties(object): # pylint: disable=too-few-public-methods + """ + mock runtime properties for cloudify contexts + """ def __init__(self, runtime_properties=None): self._runtime_properties = runtime_properties @property def runtime_properties(self): + """ + return the properties + """ return self._runtime_properties class MockSocket(object): + """ + mock socket interface + """ def __init__(self): pass - def connect(self,host=None,port=None): + def connect(self, host=None, port=None): + """ + mock socket connection + """ pass def close(self): - pass + """ + mock socket close + """ + pass + +def _connect(host, port): # pylint: disable=unused-argument + """ + mock connection + """ + return {} -def _connect(h,p): - return { } - -def set_mock_context(msg, monkeypatch): +def set_mock_context(msg, monkeypatch, writerfqdn='test.bar.example.com'): + """ + establish the mock context for our testing + """ print("================ %s ================" % msg) - os.system("exec >> {0}.out 2>&1; echo Before test".format(TMPNAME)) #### DELETE + # pylint: disable=bad-continuation props = { - 'writerfqdn': 'test.bar.example.com', + 'writerfqdn': writerfqdn, 'use_existing': False, 'readerfqdn': 'test-ro.bar.example.com', 'name': 'testdb', 'port': '5432', 'initialpassword': 'test' } - + sshkeyprops = { 'public': "testpub", 'base64private': "testpriv" } mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name', + # pylint: disable=bad-whitespace properties=props, relationships = [ MockKeyPair(type_hierarchy = @@ -109,55 +156,130 @@ def set_mock_context(msg, monkeypatch): monkeypatch.setattr(socket.socket, 'connect', _connect) # monkeypatch.setattr(psycopg2, 'connect', _connect) pgaas.pgaas_plugin.setOptManagerResources(TMPNAME) - + return mock_ctx @pytest.mark.dependency() -def test_start(monkeypatch): - os.system("exec > {0}.out 2>&1; echo Before any test; rm -rf {0}; mkdir -p {0}".format(TMPNAME)) #### DELETE +def test_start(monkeypatch): # pylint: disable=unused-argument + """ + put anything in here that needs to be done + PRIOR to the tests + """ + pass @pytest.mark.dependency(depends=['test_start']) def test_add_pgaas_cluster(monkeypatch): + """ + test add_pgaas_cluster() + """ try: set_mock_context('test_add_pgaas_cluster', monkeypatch) pgaas.pgaas_plugin.add_pgaas_cluster(args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) finally: current_ctx.clear() - os.system("exec >> {0}.out 2>&1; echo After add_pgaas_cluster test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE @pytest.mark.dependency(depends=['test_add_pgaas_cluster']) def test_add_database(monkeypatch): + """ + test add_database() + """ try: set_mock_context('test_add_database', monkeypatch) pgaas.pgaas_plugin.create_database(args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) + finally: + current_ctx.clear() + +@pytest.mark.dependency(depends=['test_add_pgaas_cluster']) +def test_bad_add_database(monkeypatch): + """ + test bad_add_database() + """ + try: + set_mock_context('test_add_database', monkeypatch, writerfqdn="bad.bar.example.com") + with pytest.raises(NonRecoverableError): + pgaas.pgaas_plugin.create_database(args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) finally: current_ctx.clear() - os.system("exec >> {0}.out 2>&1; echo After add_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE @pytest.mark.dependency(depends=['test_add_database']) def test_update_database(monkeypatch): + """ + test update_database() + """ try: - set_mock_context('test_update_database', monkeypatch) - pgaas.pgaas_plugin.update_database(args={}) + ######################################################## + # Subtle test implications regarding: update_database # + # --------------------------------------------------- # + # 1) update_database is a workflow and the context # + # passed to it has 'nodes' attribute which is not # + # not included in MockCloudifyContext # + # 2) the 'nodes' attribute is a list of contexts so # + # we will have to create a sub-context # + # 3) update_database will iterate through each of the # + # nodes contexts looking for the correct one # + # 4) To identify the correct sub-context it will first# + # check each sub-context for the existence of # + # properties attribute # + # 5) ****Mock_context internally saves properties as # + # variable _properties and 'properties' is defined # + # as @property...thus it is not recognized as an # + # attribute...this will cause update_database to # + # fail so we need to explicitly create properties # + # properties attribute in the subcontext # + ######################################################## + + #################### + # Main context # + #################### + myctx = set_mock_context('test_update_database', monkeypatch) + ########################################################### + # Create subcontext and assign it to attribute properties # + # in main context # + ########################################################### + mynode = set_mock_context('test_update_database_node', monkeypatch) + # pylint: disable=protected-access + mynode.properties = mynode._properties + myctx.nodes = [mynode] + pgaas.pgaas_plugin.update_database(ctx=myctx, args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) finally: current_ctx.clear() - os.system("exec >> {0}.out 2>&1; echo After update_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE @pytest.mark.dependency(depends=['test_update_database']) def test_delete_database(monkeypatch): + """ + test delete_database() + """ try: set_mock_context('test_delete_database', monkeypatch) pgaas.pgaas_plugin.delete_database(args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) finally: current_ctx.clear() - os.system("exec >> {0}.out 2>&1; echo After delete_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE @pytest.mark.dependency(depends=['test_delete_database']) def test_rm_pgaas_cluster(monkeypatch): + """ + test rm_pgaas_cluster() + """ try: set_mock_context('test_rm_pgaas_cluster', monkeypatch) pgaas.pgaas_plugin.rm_pgaas_cluster(args={}) + except Exception as e: + print("Error: {0}".format(e)) + print("Stack: {0}".format(traceback.format_exc())) finally: current_ctx.clear() - os.system("exec >> {0}.out 2>&1; echo After delete_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE - diff --git a/pgaas/tox.ini b/pgaas/tox.ini index e0a1682..4a2f99e 100644 --- a/pgaas/tox.ini +++ b/pgaas/tox.ini @@ -6,9 +6,9 @@ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,13 @@ [tox] envlist = py27 +# The PGaaS plugin uses several Cloudify mock libraries, one of which +# is not compatible with Python3. +# Until we get an updated version of that Cloudify mock libraries, +# we will have to leave out py3X from the tox tests. +# We cannot use py37 yet because Jenkins returns: +# InterpreterNotFound: python3.7 +#envlist = py27,py36 [testenv] deps= pytest |