summaryrefslogtreecommitdiffstats
path: root/dnsdesig/tests/test_plugin.py
diff options
context:
space:
mode:
authorAndrew Gauld <ag1282@att.com>2017-08-23 13:57:51 -0400
committerAndrew Gauld <ag1282@att.com>2017-08-23 13:58:41 -0400
commit26d8a24ade044273fe585df50f6715f8582f74e9 (patch)
treee2a2b5abbfd1e13448cc6c9e4400b41f281dddf9 /dnsdesig/tests/test_plugin.py
parent4468e8b611f85290633fee8ee9256297976bd2d9 (diff)
Seed code for Cloudify dns/Designate plugin
Change-Id: Ibc157472d6001076959face4ff3a06a808096f78 Issue-Id: CCSDK-66 Signed-off-by: Andrew Gauld <ag1282@att.com>
Diffstat (limited to 'dnsdesig/tests/test_plugin.py')
-rw-r--r--dnsdesig/tests/test_plugin.py272
1 files changed, 272 insertions, 0 deletions
diff --git a/dnsdesig/tests/test_plugin.py b/dnsdesig/tests/test_plugin.py
new file mode 100644
index 0000000..730897a
--- /dev/null
+++ b/dnsdesig/tests/test_plugin.py
@@ -0,0 +1,272 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+
+import pytest
+import requests
+import dnsdesig.dns_plugin
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify.exceptions import NonRecoverableError
+from cloudify import ctx
+
+class _resp(object):
+ def __init__(self, code, body = None):
+ self.status_code = code
+ if body is not None:
+ self._json = body
+
+ def json(self):
+ return self._json
+
+def _same(a, b):
+ t1 = type(a)
+ t2 = type(b)
+ if t1 != t2:
+ return False
+ if t1 == dict:
+ if len(a) != len(b):
+ return False
+ for k, v in a.items():
+ if k not in b or not _same(v, b[k]):
+ return False
+ return True
+ if t1 == list:
+ if len(a) != len(b):
+ return False
+ for i in range(len(a)):
+ if not _same(a[i], b[i]):
+ return False
+ return True
+ return a == b
+
+class _req(object):
+ def __init__(self, op, url, headers, resp, json = None):
+ self.resp = resp
+ self.op = op
+ self.url = url
+ self.headers = headers
+ self.json = json
+
+ def check(self, op, url, headers, json):
+ if op != self.op or url != self.url:
+ return None
+ if self.headers is not None and not _same(self.headers, headers):
+ return None
+ if self.json is not None and not _same(self.json, json):
+ return None
+ return self.resp
+
+_nf = _resp(404)
+_ar = _resp(401)
+_np = _resp(403)
+_ok = _resp(200, { 'something': 'or-other' })
+
+_tok = 'at'
+
+_hdrs = { 'X-Auth-Token': _tok }
+
+_goodos = {
+ 'auth_url': 'https://example.com/identity',
+ 'password': 'pw',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+}
+
+_bados = {
+ 'auth_url': 'https://example.com/identity',
+ 'password': 'xx',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+}
+
+
+_answers = [
+ # Authenticate
+ _req('POST', 'https://example.com/identity/tokens', headers=None, resp=_resp(200, {
+ 'access': {
+ 'token': {
+ 'id': _tok
+ }, 'serviceCatalog': [
+ {
+ 'type': 'dns',
+ 'endpoints': [
+ {
+ 'publicURL': 'https://example.com/dns',
+ 'region': 'r'
+ }
+ ]
+ }
+ ]
+ }
+ }), json={
+ 'auth': {
+ 'tenantName': 'tn',
+ 'passwordCredentials': {
+ 'username': 'un',
+ 'password': 'pw'
+ }
+ }
+ }),
+ # Invalid authentication
+ _req('POST', 'https://example.com/identity/tokens', headers=None, resp=_np),
+ # Get zones
+ _req('GET', 'https://example.com/dns/v2/zones', headers=_hdrs, resp=_resp(200, {
+ 'zones': [
+ {
+ 'name': 'x.example.com.',
+ 'id': 'z1'
+ }
+ ]
+ })),
+ # Get recordsets
+ _req('GET', 'https://example.com/dns/v2/zones/z1/recordsets?limit=1000', headers=_hdrs, resp=_resp(200, {
+ 'recordsets': [
+ {
+ 'id': 'ar1',
+ 'type': 'A',
+ 'name': 'a.x.example.com.',
+ 'ttl': 300,
+ 'records': [
+ '87.65.43.21',
+ '98.76,54.32'
+ ]
+ }, {
+ 'id': 'cname1',
+ 'type': 'CNAME',
+ 'name': 'c.x.example.com.',
+ 'ttl': 300,
+ 'records': [
+ 'a.x.example.com.'
+ ]
+ }
+ ]
+ })),
+ # Bad auth
+ _req('GET', 'https://example.com/dns/v2/zones/z1/recordsets?limit=1000', headers=None, resp=_ar),
+ # Create A recordset
+ _req('POST', 'https://example.com/dns/v2/zones/z1/recordsets', headers=_hdrs, resp=_ok, json={
+ 'type': 'A',
+ 'name': 'b.x.example.com.',
+ 'ttl': 300,
+ 'records': [
+ '34.56.78.12'
+ ]
+ }),
+ # Create CNAME recordset
+ _req('POST', 'https://example.com/dns/v2/zones/z1/recordsets', headers=_hdrs, resp=_ok, json={
+ 'type': 'CNAME',
+ 'name': 'd.x.example.com.',
+ 'ttl': 300,
+ 'records': [
+ 'b.x.example.com.'
+ ]
+ }),
+ # Update A recordset
+ _req('PUT', 'https://example.com/dns/v2/zones/z1/recordsets/ar1', headers=_hdrs, resp=_ok, json={
+
+ 'ttl': 300,
+ 'records': [
+ '34.56.78.12'
+ ]
+ }),
+ # Update CNAME recordset
+ _req('PUT', 'https://example.com/dns/v2/zones/z1/recordsets/cname1', headers=_hdrs, resp=_ok, json={
+ 'ttl': 300,
+ 'records': [
+ 'b.x.example.com.'
+ ]
+ }),
+ # Delete A recordset
+ _req('DELETE', 'https://example.com/dns/v2/zones/z1/recordsets/ar1', headers=_hdrs, resp=_ok),
+ # Delete CNAME recordset
+ _req('DELETE', 'https://example.com/dns/v2/zones/z1/recordsets/cname1', headers=_hdrs, resp=_ok)
+]
+
+def _match(op, url, headers, json = None):
+ for choice in _answers:
+ ret = choice.check(op, url, headers, json)
+ if ret is not None:
+ return ret
+ return _nf
+
+def _delete(url, headers):
+ return _match('DELETE', url, headers)
+
+def _get(url, headers):
+ return _match('GET', url, headers)
+
+def _post(url, json, headers = None):
+ return _match('POST', url, headers, json)
+
+def _put(url, json, headers = None):
+ return _match('PUT', url, headers, json)
+
+def _setup(os, fqdn, ttl=None):
+ def fcnbuilder(fcn):
+ def newfcn(monkeypatch):
+ monkeypatch.setattr(requests, 'delete', _delete)
+ monkeypatch.setattr(requests, 'get', _get)
+ monkeypatch.setattr(requests, 'post', _post)
+ monkeypatch.setattr(requests, 'put', _put)
+ properties = { 'fqdn': fqdn, 'openstack': os }
+ if ttl is not None:
+ properties['ttl'] = ttl
+ mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name', properties=properties)
+ try:
+ current_ctx.set(mock_ctx)
+ fcn()
+ finally:
+ current_ctx.clear()
+ return newfcn
+ return fcnbuilder
+
+@_setup(_bados, 'a.x.example.com')
+def test_dns_badauth():
+ with pytest.raises(NonRecoverableError):
+ dnsdesig.dns_plugin.anotneeded()
+
+@_setup(_goodos, 'a.bad.example.com')
+def test_dns_badzone():
+ with pytest.raises(NonRecoverableError):
+ dnsdesig.dns_plugin.anotneeded()
+
+@_setup(_goodos, 'b.x.example.com', 300)
+def test_dns_addarecord():
+ dnsdesig.dns_plugin.aneeded(args={'ip_addresses': [ '34.56.78.12' ]})
+
+@_setup(_goodos, 'a.x.example.com', 300)
+def test_dns_modarecord():
+ dnsdesig.dns_plugin.aneeded(args={'ip_addresses': [ '34.56.78.12' ]})
+
+@_setup(_goodos, 'a.x.example.com')
+def test_dns_delarecord():
+ dnsdesig.dns_plugin.anotneeded()
+
+@_setup(_goodos, 'd.x.example.com', 300)
+def test_dns_addcnamerecord():
+ dnsdesig.dns_plugin.cnameneeded(args={'cname': 'b.x.example.com' })
+
+@_setup(_goodos, 'c.x.example.com', 300)
+def test_dns_modcnamerecord():
+ dnsdesig.dns_plugin.cnameneeded(args={'cname': 'b.x.example.com' })
+
+@_setup(_goodos, 'c.x.example.com')
+def test_dns_delcname():
+ dnsdesig.dns_plugin.cnamenotneeded()