path: root/dnsdesig/tests/
diff options
Diffstat (limited to 'dnsdesig/tests/')
1 files changed, 272 insertions, 0 deletions
diff --git a/dnsdesig/tests/ b/dnsdesig/tests/
new file mode 100644
index 0000000..730897a
--- /dev/null
+++ b/dnsdesig/tests/
@@ -0,0 +1,272 @@
+# ============LICENSE_START====================================================
+# org.onap.ccsdk
+# =============================================================================
+# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END======================================================
+import pytest
+import requests
+import dnsdesig.dns_plugin
+from cloudify.mocks import MockCloudifyContext
+from cloudify.state import current_ctx
+from cloudify.exceptions import NonRecoverableError
+from cloudify import ctx
+class _resp(object):
+ def __init__(self, code, body = None):
+ self.status_code = code
+ if body is not None:
+ self._json = body
+ def json(self):
+ return self._json
+def _same(a, b):
+ t1 = type(a)
+ t2 = type(b)
+ if t1 != t2:
+ return False
+ if t1 == dict:
+ if len(a) != len(b):
+ return False
+ for k, v in a.items():
+ if k not in b or not _same(v, b[k]):
+ return False
+ return True
+ if t1 == list:
+ if len(a) != len(b):
+ return False
+ for i in range(len(a)):
+ if not _same(a[i], b[i]):
+ return False
+ return True
+ return a == b
+class _req(object):
+ def __init__(self, op, url, headers, resp, json = None):
+ self.resp = resp
+ self.op = op
+ self.url = url
+ self.headers = headers
+ self.json = json
+ def check(self, op, url, headers, json):
+ if op != self.op or url != self.url:
+ return None
+ if self.headers is not None and not _same(self.headers, headers):
+ return None
+ if self.json is not None and not _same(self.json, json):
+ return None
+ return self.resp
+_nf = _resp(404)
+_ar = _resp(401)
+_np = _resp(403)
+_ok = _resp(200, { 'something': 'or-other' })
+_tok = 'at'
+_hdrs = { 'X-Auth-Token': _tok }
+_goodos = {
+ 'auth_url': '',
+ 'password': 'pw',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+_bados = {
+ 'auth_url': '',
+ 'password': 'xx',
+ 'region': 'r',
+ 'tenant_name': 'tn',
+ 'username': 'un'
+_answers = [
+ # Authenticate
+ _req('POST', '', headers=None, resp=_resp(200, {
+ 'access': {
+ 'token': {
+ 'id': _tok
+ }, 'serviceCatalog': [
+ {
+ 'type': 'dns',
+ 'endpoints': [
+ {
+ 'publicURL': '',
+ 'region': 'r'
+ }
+ ]
+ }
+ ]
+ }
+ }), json={
+ 'auth': {
+ 'tenantName': 'tn',
+ 'passwordCredentials': {
+ 'username': 'un',
+ 'password': 'pw'
+ }
+ }
+ }),
+ # Invalid authentication
+ _req('POST', '', headers=None, resp=_np),
+ # Get zones
+ _req('GET', '', headers=_hdrs, resp=_resp(200, {
+ 'zones': [
+ {
+ 'name': '',
+ 'id': 'z1'
+ }
+ ]
+ })),
+ # Get recordsets
+ _req('GET', '', headers=_hdrs, resp=_resp(200, {
+ 'recordsets': [
+ {
+ 'id': 'ar1',
+ 'type': 'A',
+ 'name': '',
+ 'ttl': 300,
+ 'records': [
+ '',
+ '98.76,54.32'
+ ]
+ }, {
+ 'id': 'cname1',
+ 'type': 'CNAME',
+ 'name': '',
+ 'ttl': 300,
+ 'records': [
+ ''
+ ]
+ }
+ ]
+ })),
+ # Bad auth
+ _req('GET', '', headers=None, resp=_ar),
+ # Create A recordset
+ _req('POST', '', headers=_hdrs, resp=_ok, json={
+ 'type': 'A',
+ 'name': '',
+ 'ttl': 300,
+ 'records': [
+ ''
+ ]
+ }),
+ # Create CNAME recordset
+ _req('POST', '', headers=_hdrs, resp=_ok, json={
+ 'type': 'CNAME',
+ 'name': '',
+ 'ttl': 300,
+ 'records': [
+ ''
+ ]
+ }),
+ # Update A recordset
+ _req('PUT', '', headers=_hdrs, resp=_ok, json={
+ 'ttl': 300,
+ 'records': [
+ ''
+ ]
+ }),
+ # Update CNAME recordset
+ _req('PUT', '', headers=_hdrs, resp=_ok, json={
+ 'ttl': 300,
+ 'records': [
+ ''
+ ]
+ }),
+ # Delete A recordset
+ _req('DELETE', '', headers=_hdrs, resp=_ok),
+ # Delete CNAME recordset
+ _req('DELETE', '', headers=_hdrs, resp=_ok)
+def _match(op, url, headers, json = None):
+ for choice in _answers:
+ ret = choice.check(op, url, headers, json)
+ if ret is not None:
+ return ret
+ return _nf
+def _delete(url, headers):
+ return _match('DELETE', url, headers)
+def _get(url, headers):
+ return _match('GET', url, headers)
+def _post(url, json, headers = None):
+ return _match('POST', url, headers, json)
+def _put(url, json, headers = None):
+ return _match('PUT', url, headers, json)
+def _setup(os, fqdn, ttl=None):
+ def fcnbuilder(fcn):
+ def newfcn(monkeypatch):
+ monkeypatch.setattr(requests, 'delete', _delete)
+ monkeypatch.setattr(requests, 'get', _get)
+ monkeypatch.setattr(requests, 'post', _post)
+ monkeypatch.setattr(requests, 'put', _put)
+ properties = { 'fqdn': fqdn, 'openstack': os }
+ if ttl is not None:
+ properties['ttl'] = ttl
+ mock_ctx = MockCloudifyContext(node_id='test_node_id', node_name='test_node_name', properties=properties)
+ try:
+ current_ctx.set(mock_ctx)
+ fcn()
+ finally:
+ current_ctx.clear()
+ return newfcn
+ return fcnbuilder
+@_setup(_bados, '')
+def test_dns_badauth():
+ with pytest.raises(NonRecoverableError):
+ dnsdesig.dns_plugin.anotneeded()
+@_setup(_goodos, '')
+def test_dns_badzone():
+ with pytest.raises(NonRecoverableError):
+ dnsdesig.dns_plugin.anotneeded()
+@_setup(_goodos, '', 300)
+def test_dns_addarecord():
+ dnsdesig.dns_plugin.aneeded(args={'ip_addresses': [ '' ]})
+@_setup(_goodos, '', 300)
+def test_dns_modarecord():
+ dnsdesig.dns_plugin.aneeded(args={'ip_addresses': [ '' ]})
+@_setup(_goodos, '')
+def test_dns_delarecord():
+ dnsdesig.dns_plugin.anotneeded()
+@_setup(_goodos, '', 300)
+def test_dns_addcnamerecord():
+ dnsdesig.dns_plugin.cnameneeded(args={'cname': '' })
+@_setup(_goodos, '', 300)
+def test_dns_modcnamerecord():
+ dnsdesig.dns_plugin.cnameneeded(args={'cname': '' })
+@_setup(_goodos, '')
+def test_dns_delcname():
+ dnsdesig.dns_plugin.cnamenotneeded()