summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxufeiliu <xl085b@att.com>2018-12-17 20:50:16 +0000
committerxufeiliu <xl085b@att.com>2018-12-17 20:50:37 +0000
commitf429e8f161c95c35d7247f928482dba5491d3665 (patch)
treeaac7eea964c58f3666f5c22ef7eb25124a10236b
parent2e93164490b3ad9cd85f63ad6619ec02947dd3a3 (diff)
sync ONAP DMaaP-plugin with AT&T internal
Changes: security enhancement, support topic-name, more logs Change-Id: I7649505847a49b32d56d6e891aebb2521b54a7bd Issue-ID: CCSDK-794 Signed-off-by: xufeiliu <xl085b@att.com>
-rw-r--r--dmaap/README.md8
-rw-r--r--dmaap/dmaap.yaml12
-rw-r--r--dmaap/dmaapcontrollerif/dmaap_requests.py46
-rw-r--r--dmaap/dmaapplugin/CommonLogger.config42
-rw-r--r--dmaap/dmaapplugin/__init__.py45
-rw-r--r--dmaap/dmaapplugin/dmaaputils.py5
-rw-r--r--dmaap/dmaapplugin/dr_bridge.py9
-rw-r--r--dmaap/dmaapplugin/dr_lifecycle.py44
-rw-r--r--dmaap/dmaapplugin/dr_relationships.py10
-rw-r--r--dmaap/dmaapplugin/mr_lifecycle.py21
-rw-r--r--dmaap/dmaapplugin/pkcrypto.py142
-rw-r--r--dmaap/setup.py2
12 files changed, 352 insertions, 34 deletions
diff --git a/dmaap/README.md b/dmaap/README.md
index 646400f..193be2d 100644
--- a/dmaap/README.md
+++ b/dmaap/README.md
@@ -29,7 +29,8 @@ of this type may have.
Property|Type|Required?|Description
--------|----|---------|----------------------------------------
-feed_id|string|yes|Feed identifier assigned by DMaaP when the feed was created
+feed_id|string|no|Feed identifier assigned by DMaaP when the feed was created
+feed_name|string|no|a name that identifies the feed
- `ccsdk.nodes.ExternalTargetFeed`: This type represents a feed created in an external DMaaP
environment (i.e., an environment that the plugin cannot access to make provisioning requests, such as
@@ -142,7 +143,7 @@ run against a blueprint that contains a node of this type.
Property|Type|Required?|Description
--------|----|---------|---------------------------------------
-topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent)
+topic_name|string|no|a name that uniquely identifies the feed (plugin will generate if absent or is empty string or contain only whitespace)
topic_description|string|no|human-readable description of the feed
txenable|boolean|no|flag indicating whether transactions are enabled for this topic
replication_case|string|no|type of replication required for the topic (defaults to no replication)
@@ -172,7 +173,8 @@ of this type may have.
Property|Type|Required?|Description
--------|----|---------|----------------------------------------
-fqtn|string|yes|fully-qualified topic name for the topic
+fqtn|string|no|fully-qualified topic name for the topic
+topic_name|string|no|a name that identifies the topic
#### Interaction with Other Plugins
When creating a new topic or processing a reference to an existing topic,
diff --git a/dmaap/dmaap.yaml b/dmaap/dmaap.yaml
index 1c3ff43..9ea7820 100644
--- a/dmaap/dmaap.yaml
+++ b/dmaap/dmaap.yaml
@@ -28,7 +28,7 @@ plugins:
dmaapplugin:
executor: 'central_deployment_agent'
package_name: cloudifydmaapplugin
- package_version: 1.2.0
+ package_version: 1.2.0+t.0.11
node_types:
@@ -67,7 +67,10 @@ node_types:
properties:
feed_id:
type: string
- required: true
+ required: false
+ feed_name:
+ type: string
+ required: false
interfaces:
cloudify.interfaces.lifecycle:
@@ -132,7 +135,10 @@ node_types:
properties:
fqtn:
type: string
- required: true
+ required: false
+ topic_name:
+ type: string
+ required: false
interfaces:
cloudify.interfaces.lifecycle:
diff --git a/dmaap/dmaapcontrollerif/dmaap_requests.py b/dmaap/dmaapcontrollerif/dmaap_requests.py
index eb6fe1b..0c52a77 100644
--- a/dmaap/dmaapcontrollerif/dmaap_requests.py
+++ b/dmaap/dmaapcontrollerif/dmaap_requests.py
@@ -113,6 +113,21 @@ class DMaaPControllerHandle(object):
'''
return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+ def get_feed_info_by_name(self, feed_name):
+ '''
+ Get the representation of the DMaaP data router feed whose feed name is feed_name.
+ '''
+ feeds = self._get_resource("{0}".format(self.feeds_path))
+ feed_list = feeds.json()
+ for feed in feed_list:
+ if feed["feedName"] == feed_name:
+ self.logger.info("Found feed with {0}".format(feed_name))
+ feed_id = feed["feedId"]
+ return self._get_resource("{0}/{1}".format(self.feeds_path, feed_id))
+
+ self.logger.info("feed_name {0} not found".format(feed_name))
+ return None
+
def delete_feed(self, feed_id):
'''
Delete the DMaaP data router feed whose feed id is feed_id.
@@ -205,6 +220,21 @@ class DMaaPControllerHandle(object):
'''
return self._get_resource("{0}/{1}".format(self.topics_path, fqtn))
+ def get_topic_fqtn_by_name(self, topic_name):
+ '''
+ Get the representation of the DMaaP message router topic fqtn whose topic name is topic_name.
+ '''
+ topics = self._get_resource("{0}".format(self.topics_path))
+ topic_list = topics.json()
+ for topic in topic_list:
+ if topic["topicName"] == topic_name:
+ self.logger.info("Found existing topic with name {0}".format(topic_name))
+ fqtn = topic["fqtn"]
+ return fqtn
+
+ self.logger.info("topic_name {0} not found".format(topic_name))
+ return None
+
def delete_topic(self, fqtn):
'''
Delete the topic whose fully qualified name is 'fqtn'
@@ -243,7 +273,6 @@ class DMaaPControllerHandle(object):
'''
Get the list of location names known to the DMaaP bus controller
whose "dcaeLayer" property matches dcae_layer and whose status is "VALID".
- "dcaeLayer" is "opendcae-central" for central sites.
'''
# Do these as a separate step so things like 404 get reported precisely
locations = self._get_resource(LOCATIONS_PATH)
@@ -254,3 +283,18 @@ class DMaaPControllerHandle(object):
filter(lambda i : (i['dcaeLayer'] == dcae_layer and i['status'] == 'VALID'),
locations.json()))
+ def get_dcae_central_locations(self):
+ '''
+ Get the list of location names known to the DMaaP bus controller
+ whose "dcaeLayer" property contains "central" (ignoring case) and whose status is "VALID".
+ "dcaeLayer" contains "central" for central sites.
+ '''
+ # Do these as a separate step so things like 404 get reported precisely
+ locations = self._get_resource(LOCATIONS_PATH)
+ locations.raise_for_status()
+
+ # pull out location names for VALID central locations
+ return map(lambda l: l["dcaeLocationName"],
+ filter(lambda i : ('central' in i['dcaeLayer'].lower() and i['status'] == 'VALID'),
+ locations.json()))
+
diff --git a/dmaap/dmaapplugin/CommonLogger.config b/dmaap/dmaapplugin/CommonLogger.config
new file mode 100644
index 0000000..3f0dd69
--- /dev/null
+++ b/dmaap/dmaapplugin/CommonLogger.config
@@ -0,0 +1,42 @@
+# You may change this file while your program is running and CommonLogger will automatically reconfigure accordingly.
+# Changing these parameters may leave old log files lying around.
+
+
+#--- Parameters that apply to all logs
+#
+# rotateMethod: time, size, stdout, stderr, none
+#... Note: the following two parameters apply only when rotateMethod=time
+# timeRotateIntervalType: S, M, H, D, W0 - W6, or midnight (seconds, minutes, hours, days, weekday (0=Monday), or midnight UTC)
+# timeRotateInterval: >= 1 (1 means every timeRotateIntervalType, 2 every other, 3 every third, etc.)
+#... Note: the following parameter applies only when rotateMethod=size
+# sizeMaxBytes: >= 0 (0 means no limit, else maximum filesize in Bytes)
+# backupCount: >= 0 (Number of rotated backup files to retain. If rotateMethod=time, 0 retains *all* backups. If rotateMethod=size, 0 retains *no* backups.)
+#
+rotateMethod = size
+timeRotateIntervalType = midnight
+timeRotateInterval = 1
+sizeMaxBytes = 10000000
+backupCount = 4
+
+
+#--- Parameters that define log filenames and their initial LogLevel threshold
+#... Note: CommonLogger will exit if your process does not have permission to write to the file.
+#
+# LogLevel options: FATAL, ERROR, WARN, INFO, DEBUG
+#
+
+error = /opt/logs/dcae/cloudifymgrplugins/error.log
+errorLogLevel = WARN
+errorStyle = error
+
+metrics = /opt/logs/dcae/cloudifymgrplugins/metrics.log
+metricsLogLevel = INFO
+metricsStyle = metrics
+
+audit = /opt/logs/dcae/cloudifymgrplugins/audit.log
+auditLogLevel = INFO
+auditStyle = audit
+
+debug = /opt/logs/dcae/cloudifymgrplugins/debug.log
+debugLogLevel = INFO
+debugStyle = debug
diff --git a/dmaap/dmaapplugin/__init__.py b/dmaap/dmaapplugin/__init__.py
index 130c0bf..24466e9 100644
--- a/dmaap/dmaapplugin/__init__.py
+++ b/dmaap/dmaapplugin/__init__.py
@@ -19,28 +19,63 @@
## Get parameters for accessing the DMaaP controller
from consulif.consulif import ConsulHandle
from cloudify.exceptions import NonRecoverableError
+import os
+import pkcrypto
+
+os.environ["REQUESTS_CA_BUNDLE"]="/etc/pki/ca-trust/extracted/openssl/ca-bundle.trust.crt" # This is to handle https request thru plugin
CONSUL_HOST = "127.0.0.1" # Should always be a local consul agent on Cloudify Manager
-CM_SERVICE_NAME = "cloudify_manager" # Name under which CM is registered, used as key to get config
+DBCL_KEY_NAME = "dmaap_dbcl_info" # Consul key containing DMaaP data bus credentials
DBC_SERVICE_NAME= "dmaap_bus_controller" # Name under which the DMaaP bus controller is registered
try:
_ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, None)
- config = _ch.get_config(CM_SERVICE_NAME)
+except Exception as e:
+ raise NonRecoverableError("Error getting ConsulHandle when configuring dmaap plugin: {0}".format(e))
+
+try:
+ config = _ch.get_config(DBCL_KEY_NAME)
+except Exception as e:
+ raise NonRecoverableError("Error getting config for '{0}' from ConsulHandle when configuring dmaap plugin: {1}".format(DBCL_KEY_NAME, e))
+
+try:
DMAAP_USER = config['dmaap']['username']
- DMAAP_PASS = config['dmaap']['password']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_USER while configuring dmaap plugin: {0}".format(e))
+
+try:
+ DMAAP_PASS = pkcrypto.decrypt_obj(config['dmaap']['password'])
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PASS while configuring dmaap plugin: {0}".format(e))
+
+try:
DMAAP_OWNER = config['dmaap']['owner']
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_OWNER while configuring dmaap plugin: {0}".format(e))
+
+try:
if 'protocol' in config['dmaap']:
DMAAP_PROTOCOL = config['dmaap']['protocol']
else:
DMAAP_PROTOCOL = 'https' # Default to https (service discovery should give us this but doesn't
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PROTOCOL while configuring dmaap plugin: {0}".format(e))
+
+try:
if 'path' in config['dmaap']:
DMAAP_PATH = config['dmaap']['path']
else:
DMAAP_PATH = 'webapi' # SHould come from service discovery but Consul doesn't support it
+except Exception as e:
+ raise NonRecoverableError("Error setting DMAAP_PATH while configuring dmaap plugin: {0}".format(e))
+try:
service_address, service_port = _ch.get_service(DBC_SERVICE_NAME)
- DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
+except Exception as e:
+ raise NonRecoverableError("Error getting service_address and service_port for '{0}' from ConsulHandle when configuring dmaap plugin: {1}".format(DBC_SERVICE_NAME, e))
+try:
+ DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
except Exception as e:
- raise NonRecoverableError("Error configuring dmaap plugin: {0}".format(e))
+ raise NonRecoverableError("Error setting DMAAP_API_URL while configuring dmaap plugin: {0}".format(e))
+
diff --git a/dmaap/dmaapplugin/dmaaputils.py b/dmaap/dmaapplugin/dmaaputils.py
index e043a07..9e80416 100644
--- a/dmaap/dmaapplugin/dmaaputils.py
+++ b/dmaap/dmaapplugin/dmaaputils.py
@@ -19,10 +19,11 @@
# Utility functions
import string
-import random
+from random import SystemRandom
def random_string(n):
'''
Create a random alphanumeric string, n characters long.
'''
- return ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
+ secureRandomGen = SystemRandom()
+ return ''.join(secureRandomGen.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for x in range(n))
diff --git a/dmaap/dmaapplugin/dr_bridge.py b/dmaap/dmaapplugin/dr_bridge.py
index 4e0df4d..c103b41 100644
--- a/dmaap/dmaapplugin/dr_bridge.py
+++ b/dmaap/dmaapplugin/dr_bridge.py
@@ -22,6 +22,7 @@ from cloudify.exceptions import NonRecoverableError
from dmaapplugin import DMAAP_API_URL, DMAAP_USER, DMAAP_PASS
from dmaaputils import random_string
from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
+import pkcrypto
# Set up a subscriber to a source feed
def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw):
@@ -33,7 +34,7 @@ def _set_up_subscriber(dmc, source_feed_id, loc, delivery_url, username, userpw)
# Set up a publisher to a target feed
def _set_up_publisher(dmc, target_feed_id, loc):
username = random_string(8)
- userpw = random_string(10)
+ userpw = random_string(16)
add_pub = dmc.add_publisher(target_feed_id, loc, username, userpw)
add_pub.raise_for_status()
pub_info = add_pub.json()
@@ -41,7 +42,7 @@ def _set_up_publisher(dmc, target_feed_id, loc):
# Get a central location to use when creating a publisher or subscriber
def _get_central_location(dmc):
- locations = dmc.get_dcae_locations('opendcae-central')
+ locations = dmc.get_dcae_central_locations()
if len(locations) < 1:
raise Exception('No central location found for setting up DR bridging')
return locations[0] # We take the first one. Typically there will be two central locations
@@ -107,10 +108,10 @@ def create_external_dr_bridge(**kwargs):
try:
# Make sure target feed has full set of properties
- if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
+ if 'url' in ctx.target.node.properties and 'username' in ctx.target.node.properties and 'userpw' in ctx.target.node.properties:
url = ctx.target.node.properties['url']
username = ctx.target.node.properties['username']
- userpw = ctx.target.node.properties['userpw']
+ userpw = pkcrypto.decrypt_obj(ctx.target.node.properties['userpw'])
else:
raise Exception ("Target feed missing url, username, and/or user pw")
diff --git a/dmaap/dmaapplugin/dr_lifecycle.py b/dmaap/dmaapplugin/dr_lifecycle.py
index 45f8674..7473838 100644
--- a/dmaap/dmaapplugin/dr_lifecycle.py
+++ b/dmaap/dmaapplugin/dr_lifecycle.py
@@ -30,6 +30,7 @@ def create_feed(**kwargs):
'''
Create a new data router feed
Expects "feed_name" to be set in node properties
+ If 'feed_name' is not set or is empty, generates a random one.
Allows "feed_version", "feed_description", and "aspr_classification" as optional properties
(Sets default values if not provided )
Sets instance runtime properties:
@@ -40,9 +41,8 @@ def create_feed(**kwargs):
'''
try:
# Make sure there's a feed_name
- if "feed_name" in ctx.node.properties.keys():
- feed_name = ctx.node.properties["feed_name"]
- else:
+ feed_name = ctx.node.properties.get("feed_name")
+ if not (feed_name and feed_name.strip()):
feed_name = random_string(12)
# Set defaults/placeholders for the optional properties for the feed
@@ -76,6 +76,7 @@ def create_feed(**kwargs):
ctx.logger.error("Error creating feed: {er}".format(er=e))
raise NonRecoverableError(e)
+
@operation
def get_existing_feed(**kwargs):
'''
@@ -86,23 +87,48 @@ def get_existing_feed(**kwargs):
- "publish_url"
- "log_url"
'''
+
try:
# Make the lookup request to the controller
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
- f = dmc.get_feed_info(ctx.node.properties["feed_id"])
- f.raise_for_status()
+ ctx.logger.info("DMaaPControllerHandle() returned")
+ feed_id_input = False
+ if "feed_id" in ctx.node.properties.keys():
+ feed_id_input = True
+ f = dmc.get_feed_info(ctx.node.properties["feed_id"])
+ elif "feed_name" in ctx.node.properties.keys():
+ feed_name = ctx.node.properties["feed_name"]
+ f = dmc.get_feed_info_by_name(feed_name)
+ if f is None:
+ ctx.logger.error("Not find existing feed with feed name {0}".format(feed_name))
+ raise ValueError("Not find existing feed with feed name " + feed_name)
+ else:
+ raise ValueError("Either feed_id or feed_name must be defined to get existing feed")
+ f.raise_for_status()
+
# Capture important properties from the result
feed = f.json()
- ctx.instance.runtime_properties["feed_id"] = ctx.node.properties["feed_id"] # Just to be consistent with newly-created node, above
+ feed_id = feed["feedId"]
+ ctx.instance.runtime_properties["feed_id"] = feed_id # Just to be consistent with newly-created node, above
ctx.instance.runtime_properties["publish_url"] = feed["publishURL"]
ctx.instance.runtime_properties["log_url"] = feed["logURL"]
- ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ if feed_id_input:
+ ctx.logger.info("Found existing feed with feed id {0}".format(ctx.node.properties["feed_id"]))
+ else:
+ ctx.logger.info("Found existing feed with feed name {0}".format(ctx.node.properties["feed_name"]))
- except Exception as e:
- ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ except ValueError as e:
+ ctx.logger.error("{er}".format(er=e))
+ raise NonRecoverableError(e)
+ except Exception as e:
+ if feed_id_input:
+ ctx.logger.error("Error getting existing feed id {id}: {er}".format(id=ctx.node.properties["feed_id"],er=e))
+ else:
+ ctx.logger.error("Error getting existing feed name {name}: {er}".format(name=ctx.node.properties["feed_name"],er=e))
raise NonRecoverableError(e)
+
@operation
def delete_feed(**kwargs):
'''
diff --git a/dmaap/dmaapplugin/dr_relationships.py b/dmaap/dmaapplugin/dr_relationships.py
index 8796354..eff0fa1 100644
--- a/dmaap/dmaapplugin/dr_relationships.py
+++ b/dmaap/dmaapplugin/dr_relationships.py
@@ -59,7 +59,7 @@ def add_dr_publisher(**kwargs):
feed_id = ctx.target.instance.runtime_properties["feed_id"]
location = ctx.source.instance.runtime_properties[target_feed]["location"]
username = random_string(8)
- password = random_string(10)
+ password = random_string(16)
# Make the request to add the publisher to the feed
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
@@ -81,7 +81,9 @@ def add_dr_publisher(**kwargs):
# Set key in Consul
ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
- ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ # cpy["password"] = pkcrypto.encrypt_string(cpy["password"]) # can't encrypt until collectors can decrypt
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
except Exception as e:
ctx.logger.error("Error adding publisher to feed: {er}".format(er=e))
@@ -173,7 +175,9 @@ def add_dr_subscriber(**kwargs):
# Set key in Consul
ch = ConsulHandle("http://{0}:8500".format(CONSUL_HOST), None, None, ctx.logger)
- ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, ctx.source.instance.runtime_properties[target_feed])
+ cpy = dict(ctx.source.instance.runtime_properties[target_feed])
+ # cpy["password"] = pkcrypto.encrypt_string(cpy["password"]) # can't encrypt until collectors can decrypt
+ ch.add_to_entry("{0}:dmaap".format(ctx.source.instance.runtime_properties['service_component_name']), target_feed, cpy)
except Exception as e:
ctx.logger.error("Error adding subscriber to feed: {er}".format(er=e))
diff --git a/dmaap/dmaapplugin/mr_lifecycle.py b/dmaap/dmaapplugin/mr_lifecycle.py
index 16ad953..3e800b9 100644
--- a/dmaap/dmaapplugin/mr_lifecycle.py
+++ b/dmaap/dmaapplugin/mr_lifecycle.py
@@ -39,6 +39,8 @@ def create_topic(**kwargs):
# Make sure there's a topic_name
if "topic_name" in ctx.node.properties:
topic_name = ctx.node.properties["topic_name"]
+ if topic_name == '' or topic_name.isspace():
+ topic_name = random_string(12)
else:
topic_name = random_string(12)
@@ -82,7 +84,7 @@ def create_topic(**kwargs):
@operation
def get_existing_topic(**kwargs):
'''
- Get data for an existing feed.
+ Get data for an existing topic.
Expects 'fqtn' as a node property.
Copies this property to 'fqtn' in runtime properties for consistency
with a newly-created topic.
@@ -91,13 +93,26 @@ def get_existing_topic(**kwargs):
don't run into problems when we try to add a publisher or subscriber later.
'''
try:
- fqtn = ctx.node.properties["fqtn"]
dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
+ fqtn_input = False
+ if "fqtn" in ctx.node.properties:
+ fqtn = ctx.node.properties["fqtn"]
+ fqtn_input = True
+ elif "topic_name" in ctx.node.properties:
+ topic_name = ctx.node.properties["topic_name"]
+ ctx.logger.info("Attempting to get fqtn for existing topic {0}".format(topic_name))
+ fqtn = dmc.get_topic_fqtn_by_name(topic_name)
+ if fqtn is None:
+ raise ValueError("Not find existing topic with name " + topic_name)
+ else:
+ ctx.logger..error("Not find existing topic with name {0}".format(topic_name))
+ raise ValueError("Either fqtn or topic_name must be defined to get existing topic")
+
ctx.logger.info("Attempting to get info for existing topic {0}".format(fqtn))
t = dmc.get_topic_info(fqtn)
t.raise_for_status()
- ctx.instance.runtime_properties["fqtn"] = ctx.node.properties["fqtn"]
+ ctx.instance.runtime_properties["fqtn"] = fqtn
except Exception as e:
ctx.logger.error("Error getting existing topic: {er}".format(er=e))
diff --git a/dmaap/dmaapplugin/pkcrypto.py b/dmaap/dmaapplugin/pkcrypto.py
new file mode 100644
index 0000000..9bd2a3f
--- /dev/null
+++ b/dmaap/dmaapplugin/pkcrypto.py
@@ -0,0 +1,142 @@
+"""
+RSA encryption and decryption functions
+
+pkcrypto.py
+
+Written by: Terry Schmalzried
+Date written: September 20, 2017
+Last updated: September 27, 2017
+"""
+
+from __future__ import print_function
+import sys, subprocess, json
+
+
+def encrypt_string(clear_text):
+ """RSA encrypt a string of limited length"""
+
+ # Use Carsten's jar files and the key already installed on the host
+ cmd = ['/usr/bin/java',
+ '-cp', '/opt/lib/log4j-1.2.17.jar:/opt/lib/ncomp-utils-java-1.17070100.0-SNAPSHOT.jar',
+ 'org.openecomp.ncomp.utils.CryptoUtils',
+ 'public-key-encrypt',
+ '/opt/dcae/server.public'
+ ]
+ try:
+ p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ (stdout_data, stderr_data) = p.communicate(input=clear_text)
+ except Exception as e:
+ print("encrypt_string exception: {}".format(e), file=sys.stderr)
+ return None
+
+ if stderr_data:
+ print("encrypt_string stderr: {}".format(stderr_data), file=sys.stderr)
+ return None
+
+ return stdout_data.replace(" ","").rstrip('\n')
+
+
+def decrypt_string(encrypted_text):
+ """RSA decrypt a string"""
+
+ # Use Carsten's jar files and the key already installed on the host
+ cmd = ['sudo', '/usr/bin/java',
+ '-cp', '/opt/lib/log4j-1.2.17.jar:/opt/lib/ncomp-utils-java-1.17070100.0-SNAPSHOT.jar',
+ 'org.openecomp.ncomp.utils.CryptoUtils',
+ 'public-key-decrypt',
+ '/opt/dcae/server.private',
+ encrypted_text
+ ]
+ try:
+ p = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (stdout_data, stderr_data) = p.communicate()
+ except Exception as e:
+ print("decrypt_string exception: {}".format(e), file=sys.stderr)
+ return None
+
+ if stderr_data:
+ print("decrypt_string stderr: {}".format(stderr_data), file=sys.stderr)
+ return None
+
+ return stdout_data.rstrip('\n')
+
+
+def decrypt_obj(obj):
+ """decrypt all RSA string values in a python nested object and embedded JSON string objects"""
+
+ if isinstance(obj, dict):
+ return {k: decrypt_obj(v) for k,v in obj.items()}
+ elif isinstance(obj, list):
+ return [decrypt_obj(v) for v in obj]
+ elif isinstance(obj, basestring):
+ if obj.startswith("rsa:"):
+ obj2 = decrypt_string(obj)
+ if obj2 is not None:
+ return obj2
+ else:
+ try:
+ obj2 = json.loads(obj)
+ return json.dumps(decrypt_obj(obj2))
+ except Exception as e:
+ pass
+ return obj
+
+
+if __name__ == '__main__':
+ clear_text = "a secret"
+ print("Encrypting: {}".format(clear_text))
+ encrypted = encrypt_string(clear_text)
+ print("Encrypted: {}".format(encrypted))
+ print("Decrypted: {}".format(decrypt_string(encrypted)))
+
+
+ # print("\nWhitespace in the encrypted string does not seem to matter:")
+ # encrypted = 'rsa:Y2feMIiKwR0Df3zVDDf1K+4Lkt9vxGnT8UugHkjNLiht67PwXRJFP6/BbmZO9NhlOAMV3MLWwbhU GikE96K7wuQaQVYOmAYNNuVDWLdvbW80pZVGKYgQsmrLizOhPbhD+adG7bdIiNMNMBOKk+XQMTLa d77KzAQmZO2wLj0Z3As='
+ # print("Decrypted: {}".format(decrypt_string(encrypted)))
+
+ # encrypted = '''rsa:Y2feMIiKwR0Df3zVDDf1K+4Lkt9vxGnT8UugHkjNLiht67PwXRJFP6/BbmZO9NhlOAMV3MLWwbhU
+ # GikE96K7wuQaQVYOmAYNNuVDWLdvbW80pZVGKYgQsmrLizOhPbhD+adG7bdIiNMNMBOKk+XQMTLa
+ # d77KzAQmZO2wLj0Z3As='''
+ # print("Decrypted: {}".format(decrypt_string(encrypted)))
+
+
+ print("\nDecrypt some dicts:")
+ print("Decrypted: {}".format(decrypt_obj('not encrypted')))
+ print("Decrypted: {}".format(decrypt_obj(encrypted)))
+ print("Decrypted: {}".format(decrypt_obj({
+ "key1":encrypted,
+ "key2":"not encrypted",
+ "key3":encrypted,
+ "key4":{
+ "key11":encrypted,
+ "key12":"not encrypted",
+ "key13":encrypted,
+ "key14":[
+ encrypted,
+ "not encrypted",
+ encrypted
+ ]
+ }
+ })))
+
+
+ print("\nDecrypt some JSON:")
+ encrypted = json.dumps([{ "username": "m01234@bogus.att.com",
+ "password": encrypt_string("N0t_a-Rea1/passw0rd"),
+ "registry": "dockercentral.it.att.com:12345"
+ }])
+ print("Encrypted: {}".format(encrypted))
+ print("Decrypted: {}".format(decrypt_obj(encrypted)))
+
+
+ print("\nDecrypt a dict that contains a json string containing encrypted keys:")
+ a_dict = {
+ "clear_txt": clear_text,
+ "encrypted_str": encrypt_string(clear_text),
+ "json_str": encrypted
+ }
+ print("Decrypted: {}".format(decrypt_obj(a_dict)))
+
+
+ print("\nDecrypt a json string that contains a dict that contains a json string containing encrypted keys:")
+ print("Decrypted: {}".format(decrypt_obj(json.dumps(a_dict))))
diff --git a/dmaap/setup.py b/dmaap/setup.py
index 0d23668..3eb16f7 100644
--- a/dmaap/setup.py
+++ b/dmaap/setup.py
@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name = "cloudifydmaapplugin",
- version = "1.2.0",
+ version = "1.2.0+t.0.11",
packages=find_packages(),
author = "AT&T",
description = ("Cloudify plugin for creating DMaaP feeds and topics, and setting up publishers and subscribers."),