diff options
Diffstat (limited to 'pgaas/pgaas/pgaas_plugin.py')
-rw-r--r-- | pgaas/pgaas/pgaas_plugin.py | 184 |
1 files changed, 64 insertions, 120 deletions
diff --git a/pgaas/pgaas/pgaas_plugin.py b/pgaas/pgaas/pgaas_plugin.py index c0cab67..bfeeba0 100644 --- a/pgaas/pgaas/pgaas_plugin.py +++ b/pgaas/pgaas/pgaas_plugin.py @@ -1,28 +1,8 @@ -# org.onap.ccsdk -# ============LICENSE_START==================================================== -# ============================================================================= -# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. -# ============================================================================= -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END====================================================== - from cloudify import ctx from cloudify.decorators import operation from cloudify.exceptions import NonRecoverableError from cloudify.exceptions import RecoverableError -from logginginterface import * - import os import re import json @@ -63,22 +43,8 @@ sys.path = opath # or: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] } use_existing: true - To initialize an existing server to be managed by pgaas_plugin:: - - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml - - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml - pgaas_cluster: - type: dcae.nodes.pgaas.cluster - properties: - writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] } - readerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '.', { get_input: location_domain } ] } - initialpassword: { get_input: currentpassword } - relationships: - - type: dcae.relationships.pgaas_cluster_uses_sshkeypair - target: sharedsshkey_pgrs - - { get_attribute: [ pgaas_cluster, public ] } - { get_attribute: [ pgaas_cluster, base64private ] } - # - { get_attribute: [ pgaas_cluster, postgrespswd ] } To set up a database: @@ -137,12 +103,33 @@ def setOptManagerResources(o): def safestr(s): return urllib.quote(str(s), '') +def debug(msg): + """ + Print a debugging message. + This is a handy endpoint to add other extended debugging calls. + """ + ctx.logger.info(msg) + +def warn(msg): + """ + Print a warning message. + This is a handy endpoint to add other extended warning calls. + """ + ctx.logger.warn(msg) + +def info(msg): + """ + Print a info message. + This is a handy endpoint to add other extended info calls. + """ + ctx.logger.info(msg) + def raiseRecoverableError(msg): """ Print a warning message and raise a RecoverableError exception. This is a handy endpoint to add other extended debugging calls. """ - warn(msg) + ctx.logger.warn(msg) raise RecoverableError(msg) def raiseNonRecoverableError(msg): @@ -150,12 +137,9 @@ def raiseNonRecoverableError(msg): Print an error message and raise a NonRecoverableError exception. This is a handy endpoint to add other extended debugging calls. """ - error(msg) + ctx.logger.error(msg) raise NonRecoverableError(msg) -def dbexecute(crx, cmd, args=None): - debug("executing {}".format(cmd)) - crx.execute(cmd, args) def waithp(host, port): """ @@ -164,7 +148,7 @@ def waithp(host, port): debug("waithp({0},{1})".format(safestr(host),safestr(port))) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - sock.connect((host, int(port))) + sock.connect((host, port)) except: a, b, c = sys.exc_info() traceback.print_exception(a, b, c) @@ -176,57 +160,30 @@ def doconn(desc): """ open an SQL connection to the PG server """ - debug("doconn({},{},{})".format(desc['host'], desc['user'],desc['database'])) - # debug("doconn({},{},{},{})".format(desc['host'], desc['user'],desc['database'],desc['password'])) + debug("doconn()") ret = psycopg2.connect(**desc) ret.autocommit = True return ret -def hostportion(hostport): - """ - return the host portion of a fqdn:port or IPv4:port or [IPv6]:port - """ - ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport) - ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport) - if ipv4re: - return ipv4re.group(1) - if ipv6re: - return ipv6re.group(1) - raiseNonRecoverableError("invalid hostport: {}".format(hostport)) - -def portportion(hostport): - """ - Return the port portion of a fqdn:port or IPv4:port or [IPv6]:port. - If port is not present, return 5432. - """ - ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport) - ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport) - if ipv4re: - return ipv4re.group(3) if ipv4re.group(3) else '5432' - if ipv6re: - return ipv6re.group(3) if ipv6re.group(3) else '5432' - raiseNonRecoverableError("invalid hostport: {}".format(hostport)) - -def rootdesc(data, dbname, initialpassword=None): +def rootdesc(data, dbname): """ return the postgres connection information """ debug("rootdesc(..data..,{0})".format(safestr(dbname))) return { 'database': dbname, - 'host': hostportion(data['rw']), - 'port': portportion(data['rw']), + 'host': data['rw'], 'user': 'postgres', - 'password': initialpassword if initialpassword else getpass(data, 'postgres') + 'password': getpass(data, 'postgres') } -def rootconn(data, dbname='postgres', initialpassword=None): +def rootconn(data, dbname='postgres'): """ connect to a given server as postgres, connecting to the specified database """ debug("rootconn(..data..,{0})".format(safestr(dbname))) - ret = doconn(rootdesc(data, dbname, initialpassword)) + ret = doconn(rootdesc(data, dbname)) return ret def onedesc(data, dbname, role, access): @@ -236,8 +193,7 @@ def onedesc(data, dbname, role, access): user = '{0}_{1}'.format(dbname, role) return { 'database': dbname, - 'host': hostportion(data[access]), - 'port': portportion(data[access]), + 'host': data[access], 'user': user, 'password': getpass(data, user) } @@ -256,7 +212,7 @@ def getpass(data, ident): """ generate the password for a given user on a specific server """ - m = hashlib.sha256() + m = hashlib.md5() m.update(ident) m.update(base64.b64decode(data['data'])) return m.hexdigest() @@ -278,10 +234,7 @@ def chkfqdn(fqdn): """ verify that a FQDN is valid """ - hp = hostportion(fqdn) - pp = portportion(fqdn) - # TODO need to augment this for IPv6 addresses - return re.match('^[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$', hp) is not None + return re.match('^[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$', fqdn) is not None def chkdbname(dbname): """ @@ -291,13 +244,13 @@ def chkdbname(dbname): if not ret: warn("Invalid dbname: {0}".format(safestr(dbname))) return ret -def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related): +def getclusterinfo(wfqdn, reuse, rfqdn, related): """ Retrieve all of the information specific to a cluster. if reuse, retrieve it else create and store it """ - debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword))) + debug("getclusterinfo({0}, {1}, {2},..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn))) if not chkfqdn(wfqdn): raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) if reuse: @@ -321,7 +274,7 @@ def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related): raiseNonRecoverableError('Invalid FQDN specified for read-only access, fqdn={0}'.format(safestr(rfqdn))) if len(related) != 1: raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair relationship to a dcae.nodes.sshkeypair node') - data = { 'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'], 'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256' } + data = { 'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'], 'data': related[0].instance.runtime_properties['base64private'] } os.umask(077) try: os.makedirs('{0}/pgaas'.format(OPT_MANAGER_RESOURCES)) @@ -335,12 +288,8 @@ def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related): warn("Stack: {0}".format(traceback.format_exc())) raiseNonRecoverableError('Cannot write cluster information to {0}/pgaas: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES, safestr(wfqdn),e)) data['rw'] = wfqdn - if initialpassword: - with rootconn(data, initialpassword=initialpassword) as conn: - crr = conn.cursor() - dbexecute(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres'),)) - crr.close() return(data) + @operation def add_pgaas_cluster(**kwargs): @@ -350,14 +299,9 @@ def add_pgaas_cluster(**kwargs): """ try: warn("add_pgaas_cluster() invoked") - data = getclusterinfo(ctx.node.properties['writerfqdn'], - ctx.node.properties['use_existing'], - ctx.node.properties['readerfqdn'], - ctx.node.properties['initialpassword'], - find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair')) + data = getclusterinfo(ctx.node.properties['writerfqdn'], ctx.node.properties['use_existing'], ctx.node.properties['readerfqdn'], find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair')) ctx.instance.runtime_properties['public'] = data['pubkey'] ctx.instance.runtime_properties['base64private'] = data['data'] - # ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres') warn('All done') except Exception as e: ctx.logger.warn("Error: {0}".format(e)) @@ -394,8 +338,8 @@ def dbgetinfo(refctx): wfqdn = related[0].node.properties['writerfqdn'] if not chkfqdn(wfqdn): raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn))) - ret = getclusterinfo(wfqdn, True, '', '', []) - waithp(hostportion(wfqdn), portportion(wfqdn)) + ret = getclusterinfo(wfqdn, True, '', []) + waithp(wfqdn, 5432) return ret @operation @@ -419,13 +363,13 @@ def create_database(**kwargs): ctx.instance.runtime_properties['viewer'] = descs['viewer'] with rootconn(info) as conn: crx = conn.cursor() - dbexecute(crx,'SELECT datname FROM pg_database WHERE datistemplate = false') + crx.execute('SELECT datname FROM pg_database WHERE datistemplate = false') existingdbs = [ x[0] for x in crx ] if ctx.node.properties['use_existing']: if dbname not in existingdbs: raiseNonRecoverableError('use_existing specified but database does not exist, dbname={0}'.format(safestr(dbname))) return - dbexecute(crx,'SELECT rolname FROM pg_roles') + crx.execute('SELECT rolname FROM pg_roles') existingroles = [ x[0] for x in crx ] admu = descs['admin']['user'] usru = descs['user']['user'] @@ -434,36 +378,36 @@ def create_database(**kwargs): cvwr = '{0}_common_viewer_role'.format(dbname) schm = '{0}_db_common'.format(dbname) if admu not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],)) + crx.execute('CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],)) if usru not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],)) + crx.execute('CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],)) if vwru not in existingroles: - dbexecute(crx,'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],)) + crx.execute('CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],)) if cusr not in existingroles: - dbexecute(crx,'CREATE ROLE {0}'.format(cusr)) + crx.execute('CREATE ROLE {0}'.format(cusr)) if cvwr not in existingroles: - dbexecute(crx,'CREATE ROLE {0}'.format(cvwr)) + crx.execute('CREATE ROLE {0}'.format(cvwr)) if dbname not in existingdbs: - dbexecute(crx,'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu)) + crx.execute('CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu)) crx.close() with rootconn(info, dbname) as dbconn: crz = dbconn.cursor() for r in [ cusr, cvwr, usru, vwru ]: - dbexecute(crz,'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r)) - dbexecute(crz,'GRANT {0} TO {1}'.format(cvwr, cusr)) - dbexecute(crz,'GRANT {0} TO {1}'.format(cusr, admu)) - dbexecute(crz,'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr)) - dbexecute(crz,'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu)) + crz.execute('REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r)) + crz.execute('GRANT {0} TO {1}'.format(cvwr, cusr)) + crz.execute('GRANT {0} TO {1}'.format(cusr, admu)) + crz.execute('GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr)) + crz.execute('CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu)) for r in [ admu, cusr, cvwr, usru, vwru ]: - dbexecute(crz,'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm)) - dbexecute(crz,'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr)) - dbexecute(crz,'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr)) - dbexecute(crz,'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr)) - dbexecute(crz,'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr)) - dbexecute(crz,'GRANT {0} to {1}'.format(cusr, usru)) - dbexecute(crz,'GRANT {0} to {1}'.format(cvwr, vwru)) + crz.execute('ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm)) + crz.execute('GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr)) + crz.execute('GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu)) + crz.execute('ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr)) + crz.execute('ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr)) + crz.execute('ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr)) + crz.execute('GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr)) + crz.execute('GRANT {0} to {1}'.format(cusr, usru)) + crz.execute('GRANT {0} to {1}'.format(cvwr, vwru)) crz.close() warn('All done') except Exception as e: @@ -496,9 +440,9 @@ def delete_database(**kwargs): vwru = ctx.instance.runtime_properties['viewer']['user'] cusr = '{0}_common_user_role'.format(dbname) cvwr = '{0}_common_viewer_role'.format(dbname) - dbexecute(crx,'DROP DATABASE IF EXISTS {0}'.format(dbname)) + crx.execute('DROP DATABASE IF EXISTS {0}'.format(dbname)) for r in [ usru, vwru, admu, cusr, cvwr ]: - dbexecute(crx,'DROP ROLE IF EXISTS {0}'.format(r)) + crx.execute('DROP ROLE IF EXISTS {0}'.format(r)) warn('All gone') except Exception as e: ctx.logger.warn("Error: {0}".format(e)) |