summaryrefslogtreecommitdiffstats
path: root/pgaas/tests
diff options
context:
space:
mode:
Diffstat (limited to 'pgaas/tests')
-rw-r--r--pgaas/tests/psycopg2.py39
-rw-r--r--pgaas/tests/test_plugin.py170
2 files changed, 174 insertions, 35 deletions
diff --git a/pgaas/tests/psycopg2.py b/pgaas/tests/psycopg2.py
index fab5333..07b71f4 100644
--- a/pgaas/tests/psycopg2.py
+++ b/pgaas/tests/psycopg2.py
@@ -22,32 +22,49 @@ This is a mock psycopg2 module.
"""
-class csr(object):
+class MockCursor(object):
+ """
+ mocked cursor
+ """
def __init__(self, **kwargs):
pass
- def execute(self, cmd, exc = None):
+ def execute(self, cmd, exc=None):
+ """
+ mock SQL execution
+ """
pass
-
+
def close(self):
+ """
+ mock SQL close
+ """
pass
def __iter__(self):
return iter([])
-
-class conn(object):
+
+class MockConn(object): # pylint: disable=too-few-public-methods
+ """
+ mock SQL connection
+ """
def __init__(self, **kwargs):
pass
def __enter__(self):
return self
-
+
def __exit__(self, exc_type, exc_value, traceback):
pass
- def cursor(self):
- return csr()
-
-def connect(**kwargs):
- return conn()
+ def cursor(self): # pylint: disable=no-self-use
+ """
+ mock return a cursor
+ """
+ return MockCursor()
+def connect(**kwargs): # pylint: disable=unused-argument
+ """
+ mock get-a-connection
+ """
+ return MockConn()
diff --git a/pgaas/tests/test_plugin.py b/pgaas/tests/test_plugin.py
index d3c29b9..0071499 100644
--- a/pgaas/tests/test_plugin.py
+++ b/pgaas/tests/test_plugin.py
@@ -16,6 +16,12 @@
# limitations under the License.
# ============LICENSE_END======================================================
+"""
+unit tests for PostgreSQL password plugin
+"""
+
+from __future__ import print_function
+# pylint: disable=import-error,unused-import,wrong-import-order
import pytest
import socket
import psycopg2
@@ -28,70 +34,111 @@ from cloudify.state import current_ctx
from cloudify.exceptions import NonRecoverableError
from cloudify import ctx
-import sys, os
+import sys
+import os
sys.path.append(os.path.realpath(os.path.dirname(__file__)))
+import traceback
-TMPNAME = "/tmp/pgaas_plugin_tests"
+TMPNAME = "/tmp/pgaas_plugin_tests_{}".format(os.environ["USER"] if "USER" in os.environ else
+ os.environ["LOGNAME"] if "LOGNAME" in os.environ else
+ str(os.getuid()))
class MockKeyPair(object):
+ """
+ mock keypair for cloudify contexts
+ """
def __init__(self, type_hierarchy=None, target=None):
self._type_hierarchy = type_hierarchy
self._target = target
@property
def type_hierarchy(self):
+ """
+ return the type hierarchy
+ """
return self._type_hierarchy
@property
def target(self):
+ """
+ return the target
+ """
return self._target
-class MockInstance(object):
+class MockInstance(object): # pylint: disable=too-few-public-methods
+ """
+ mock instance for cloudify contexts
+ """
def __init__(self, instance=None):
self._instance = instance
@property
def instance(self):
+ """
+ return the instance
+ """
return self._instance
-class MockRuntimeProperties(object):
+class MockRuntimeProperties(object): # pylint: disable=too-few-public-methods
+ """
+ mock runtime properties for cloudify contexts
+ """
def __init__(self, runtime_properties=None):
self._runtime_properties = runtime_properties
@property
def runtime_properties(self):
+ """
+ return the properties
+ """
return self._runtime_properties
class MockSocket(object):
+ """
+ mock socket interface
+ """
def __init__(self):
pass
- def connect(self,host=None,port=None):
+ def connect(self, host=None, port=None):
+ """
+ mock socket connection
+ """
pass
def close(self):
- pass
+ """
+ mock socket close
+ """
+ pass
+
+def _connect(host, port): # pylint: disable=unused-argument
+ """
+ mock connection
+ """
+ return {}
-def _connect(h,p):
- return { }
-
-def set_mock_context(msg, monkeypatch):
+def set_mock_context(msg, monkeypatch, writerfqdn='test.bar.example.com'):
+ """
+ establish the mock context for our testing
+ """
print("================ %s ================" % msg)
- os.system("exec >> {0}.out 2>&1; echo Before test".format(TMPNAME)) #### DELETE
+ # pylint: disable=bad-continuation
props = {
- 'writerfqdn': 'test.bar.example.com',
+ 'writerfqdn': writerfqdn,
'use_existing': False,
'readerfqdn': 'test-ro.bar.example.com',
'name': 'testdb',
'port': '5432',
'initialpassword': 'test'
}
-
+
sshkeyprops = {
'public': "testpub",
'base64private': "testpriv"
}
mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name',
+ # pylint: disable=bad-whitespace
properties=props,
relationships = [
MockKeyPair(type_hierarchy =
@@ -109,55 +156,130 @@ def set_mock_context(msg, monkeypatch):
monkeypatch.setattr(socket.socket, 'connect', _connect)
# monkeypatch.setattr(psycopg2, 'connect', _connect)
pgaas.pgaas_plugin.setOptManagerResources(TMPNAME)
-
+ return mock_ctx
@pytest.mark.dependency()
-def test_start(monkeypatch):
- os.system("exec > {0}.out 2>&1; echo Before any test; rm -rf {0}; mkdir -p {0}".format(TMPNAME)) #### DELETE
+def test_start(monkeypatch): # pylint: disable=unused-argument
+ """
+ put anything in here that needs to be done
+ PRIOR to the tests
+ """
+ pass
@pytest.mark.dependency(depends=['test_start'])
def test_add_pgaas_cluster(monkeypatch):
+ """
+ test add_pgaas_cluster()
+ """
try:
set_mock_context('test_add_pgaas_cluster', monkeypatch)
pgaas.pgaas_plugin.add_pgaas_cluster(args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
finally:
current_ctx.clear()
- os.system("exec >> {0}.out 2>&1; echo After add_pgaas_cluster test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE
@pytest.mark.dependency(depends=['test_add_pgaas_cluster'])
def test_add_database(monkeypatch):
+ """
+ test add_database()
+ """
try:
set_mock_context('test_add_database', monkeypatch)
pgaas.pgaas_plugin.create_database(args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
+ finally:
+ current_ctx.clear()
+
+@pytest.mark.dependency(depends=['test_add_pgaas_cluster'])
+def test_bad_add_database(monkeypatch):
+ """
+ test bad_add_database()
+ """
+ try:
+ set_mock_context('test_add_database', monkeypatch, writerfqdn="bad.bar.example.com")
+ with pytest.raises(NonRecoverableError):
+ pgaas.pgaas_plugin.create_database(args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
finally:
current_ctx.clear()
- os.system("exec >> {0}.out 2>&1; echo After add_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE
@pytest.mark.dependency(depends=['test_add_database'])
def test_update_database(monkeypatch):
+ """
+ test update_database()
+ """
try:
- set_mock_context('test_update_database', monkeypatch)
- pgaas.pgaas_plugin.update_database(args={})
+ ########################################################
+ # Subtle test implications regarding: update_database #
+ # --------------------------------------------------- #
+ # 1) update_database is a workflow and the context #
+ # passed to it has 'nodes' attribute which is not #
+ # not included in MockCloudifyContext #
+ # 2) the 'nodes' attribute is a list of contexts so #
+ # we will have to create a sub-context #
+ # 3) update_database will iterate through each of the #
+ # nodes contexts looking for the correct one #
+ # 4) To identify the correct sub-context it will first#
+ # check each sub-context for the existence of #
+ # properties attribute #
+ # 5) ****Mock_context internally saves properties as #
+ # variable _properties and 'properties' is defined #
+ # as @property...thus it is not recognized as an #
+ # attribute...this will cause update_database to #
+ # fail so we need to explicitly create properties #
+ # properties attribute in the subcontext #
+ ########################################################
+
+ ####################
+ # Main context #
+ ####################
+ myctx = set_mock_context('test_update_database', monkeypatch)
+ ###########################################################
+ # Create subcontext and assign it to attribute properties #
+ # in main context #
+ ###########################################################
+ mynode = set_mock_context('test_update_database_node', monkeypatch)
+ # pylint: disable=protected-access
+ mynode.properties = mynode._properties
+ myctx.nodes = [mynode]
+ pgaas.pgaas_plugin.update_database(ctx=myctx, args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
finally:
current_ctx.clear()
- os.system("exec >> {0}.out 2>&1; echo After update_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE
@pytest.mark.dependency(depends=['test_update_database'])
def test_delete_database(monkeypatch):
+ """
+ test delete_database()
+ """
try:
set_mock_context('test_delete_database', monkeypatch)
pgaas.pgaas_plugin.delete_database(args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
finally:
current_ctx.clear()
- os.system("exec >> {0}.out 2>&1; echo After delete_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE
@pytest.mark.dependency(depends=['test_delete_database'])
def test_rm_pgaas_cluster(monkeypatch):
+ """
+ test rm_pgaas_cluster()
+ """
try:
set_mock_context('test_rm_pgaas_cluster', monkeypatch)
pgaas.pgaas_plugin.rm_pgaas_cluster(args={})
+ except Exception as e:
+ print("Error: {0}".format(e))
+ print("Stack: {0}".format(traceback.format_exc()))
finally:
current_ctx.clear()
- os.system("exec >> {0}.out 2>&1; echo After delete_database test; ls -lR {0}; head -1000 /dev/null {0}/pgaas/*;echo".format(TMPNAME)) #### DELETE
-