1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# ============LICENSE_START====================================================
# org.onap.ccsdk
# =============================================================================
# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
# =============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END======================================================
import pytest
import socket
import psycopg2
import pgaas.pgaas_plugin
from cloudify.mocks import MockCloudifyContext
from cloudify.mocks import MockNodeContext
from cloudify.mocks import MockNodeInstanceContext
from cloudify.mocks import MockRelationshipSubjectContext
from cloudify.state import current_ctx
from cloudify.exceptions import NonRecoverableError
from cloudify import ctx
import sys, os
sys.path.append(os.path.realpath(os.path.dirname(__file__)))
class MockKeyPair(object):
def __init__(self, type_hierarchy=None, target=None):
self._type_hierarchy = type_hierarchy
self._target = target
@property
def type_hierarchy(self):
return self._type_hierarchy
@property
def target(self):
return self._target
class MockInstance(object):
def __init__(self, instance=None):
self._instance = instance
@property
def instance(self):
return self._instance
class MockRuntimeProperties(object):
def __init__(self, runtime_properties=None):
self._runtime_properties = runtime_properties
@property
def runtime_properties(self):
return self._runtime_properties
def _connect(h,p):
return { }
def set_mock_context(msg, monkeypatch):
print("================ %s ================" % msg)
os.system("echo Before test; ls -l /tmp/pgaas") #### DELETE
props = {
'writerfqdn': 'test.bar.example.com',
'use_existing': False,
'readerfqdn': 'test-ro.bar.example.com',
'name': 'testdb'
}
sshkeyprops = {
'public': "testpub",
'base64private': "testpriv"
}
mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name',
properties=props,
relationships = [
MockKeyPair(type_hierarchy =
[ "dcae.relationships.pgaas_cluster_uses_sshkeypair" ],
target= MockInstance(
MockRuntimeProperties(sshkeyprops)) )
],
runtime_properties = {
"admin": { "user": "admin_user" },
"user": { "user": "user_user" },
"viewer": { "user": "viewer_user" }
}
)
current_ctx.set(mock_ctx)
monkeypatch.setattr(socket.socket, 'connect', _connect)
# monkeypatch.setattr(psycopg2, 'connect', _connect)
pgaas.pgaas_plugin.setOptManagerResources("/tmp")
@pytest.mark.dependency()
def test_add_pgaas_cluster(monkeypatch):
try:
set_mock_context('test_add_pgaas_cluster', monkeypatch)
pgaas.pgaas_plugin.add_pgaas_cluster(args={})
finally:
current_ctx.clear()
os.system("echo After test; ls -l /tmp/pgaas") #### DELETE
class MockSocket(object):
def __init__(self):
pass
def connect(self,host=None,port=None):
pass
def close(self):
pass
@pytest.mark.dependency(depends=['test_add_pgaas_cluster'])
def test_add_database(monkeypatch):
try:
set_mock_context('test_add_database', monkeypatch)
pgaas.pgaas_plugin.create_database(args={})
finally:
current_ctx.clear()
os.system("echo After test; ls -l /tmp/pgaas") #### DELETE
@pytest.mark.dependency(depends=['test_add_database'])
def test_delete_database(monkeypatch):
try:
set_mock_context('test_delete_database', monkeypatch)
pgaas.pgaas_plugin.delete_database(args={})
finally:
current_ctx.clear()
os.system("echo After test; ls -l /tmp/pgaas") #### DELETE
@pytest.mark.dependency(depends=['test_delete_database'])
def test_rm_pgaas_cluster(monkeypatch):
try:
set_mock_context('test_rm_pgaas_cluster', monkeypatch)
pgaas.pgaas_plugin.rm_pgaas_cluster(args={})
finally:
current_ctx.clear()
os.system("echo After test; ls -l /tmp/pgaas") #### DELETE
|