summaryrefslogtreecommitdiffstats
path: root/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'cdap/cdapplugin/cdapcloudify/cdap_plugin.py')
-rw-r--r--cdap/cdapplugin/cdapcloudify/cdap_plugin.py92
1 files changed, 68 insertions, 24 deletions
diff --git a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
index b4d22cd..1de0173 100644
--- a/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
+++ b/cdap/cdapplugin/cdapcloudify/cdap_plugin.py
@@ -5,9 +5,9 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,10 +16,11 @@
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+from dcaepolicy import Policies
import requests
from cloudify import ctx
-from cloudify.decorators import operation
+from cloudify.decorators import operation
from cloudify.exceptions import NonRecoverableError
import time
import uuid
@@ -27,9 +28,6 @@ import re
from cdapcloudify import discovery
import json
-"""
-TODO: Tons of crappy URL forming going on here...
-"""
# Property keys
SERVICE_COMPONENT_NAME = "service_component_name"
SELECTED_BROKER = "selected_broker"
@@ -45,6 +43,19 @@ class BadConnections(NonRecoverableError):
pass
+def _trigger_update(updated_policies):
+ """
+ Helper function for reconfiguring after a policy update
+ """
+ for p in updated_policies:
+ ctx.logger.info("Reconfiguring CDAP application via smart interface")
+ return discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ config = p,
+ reconfiguration_type = "program-flowlet-smart",
+ logger = ctx.logger)
+
+
def _validate_conns(connections):
"""
Cloudify allows you to type spec a data type in a type file, however it does not appear to do strict checking on blueprints against that.
@@ -62,10 +73,10 @@ def _validate_conns(connections):
assert(s["type"] in ["message_router", "data_router"])
if s["type"] == "message_router":
_assert_ks_in_d(["aaf_username", "aaf_password", "client_role"], s) #I am not checking that these are not blank. I will leave it possible for you to put empty values for these, but force you to acknowledge that you are doing so by not allowing these to be ommited.
- #nothing extra for DR; no AAF, no client role.
+ #nothing extra for DR; no AAF, no client role.
except:
raise BadConnections("Bad Connections definition in blueprint") #is a NoneRecoverable
-
+
def _streams_iterator(streams):
"""
helper function for iterating over streams_publishes and subscribes
@@ -73,7 +84,7 @@ def _streams_iterator(streams):
"""
for_config = {}
for s in streams:
- if s["type"] == "message_router":
+ if s["type"] == "message_router":
#set the properties the DMaaP plugin needs
ctx.instance.runtime_properties[s["name"]] = {"client_role" : s["client_role"], "location" : s["location"]}
#form (or append to) the dict the component will get, including the template for the CBS
@@ -96,6 +107,19 @@ def _services_calls_iterator(services_calls):
for_config[s["config_key"]] = "{{ " + s["service_component_type"] + " }}" #will get bound by CBS
return for_config
+
+######################
+# TEMPORARY!!!!!!
+# THIS WILL GO AWAY ONCE ALEX HAS A NODE TYPE AND PLUGIN
+######################
+@operation
+@Policies.populate_policy_on_node
+def policy_get(**kwargs):
+ """decorate with @Policies.populate_policy_on_node on dcae.policy node to
+ retrieve the latest policy_body for policy_id property and save it in runtime_properties
+ """
+ pass
+
######################
# Cloudify Operations
######################
@@ -108,19 +132,19 @@ def create(connected_broker_dns_name, **kwargs):
#fail fast
_validate_conns(ctx.node.properties["connections"])
-
+
#The config binding service needs to know whether cdap or docker. Currently (aug 1 2018) it looks for "cdap_app" in the name
service_component_name = "{0}_cdap_app_{1}".format(str(uuid.uuid4()).replace("-",""), ctx.node.properties["service_component_type"])
#set this into a runtime dictionary
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
-
+
#fetch the broker name from inputs and set it in runtime properties so other functions can use it
ctx.instance.runtime_properties[SELECTED_BROKER] = connected_broker_dns_name
-
+
#set the properties the DMaap plugin expects for message router
#see the README for the structures of these keys
- #NOTE! This has to be done in create because Jack's DMaaP plugin expects to do it's thing in preconfigure.
+ #NOTE! This has to be done in create because Jack's DMaaP plugin expects to do it's thing in preconfigure.
# and we need to get this key into consul before start
#set this as a runtime property for start to use
ctx.instance.runtime_properties[PUB_C] = _streams_iterator(ctx.node.properties["connections"][STREAMS_PUBLISHES])
@@ -128,6 +152,7 @@ def create(connected_broker_dns_name, **kwargs):
ctx.instance.runtime_properties[SER_C] = _services_calls_iterator(ctx.node.properties["connections"][SERVICES_CALLS])
@operation
+@Policies.gather_policies_to_node
def deploy_and_start_application(**kwargs):
"""
pushes the application into the workspace and starts it
@@ -158,17 +183,24 @@ def deploy_and_start_application(**kwargs):
programs = ctx.node.properties["programs"],
program_preferences = ctx.node.properties["program_preferences"],
logger = ctx.logger)
-
- except Exception as e:
- ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
+
+ #TODO! Would be better to do an initial merge first before deploying, but the merge is complicated for CDAP
+ #because of app config vs. app preferences. So, for now, let the broker do the work with an immediate reconfigure
+ #get policies that may have changed prior to this blueprint deployment
+ policy_configs = Policies.get_policy_configs()
+ ctx.logger.info("Updated policy configs: {0}".format(policy_configs))
+ _trigger_update(policy_configs)
+
+ except Exception as e:
+ ctx.logger.error("Error depploying CDAP app: {er}".format(er=e))
raise NonRecoverableError(e)
@operation
def stop_and_undeploy_application(**kwargs):
#per jack Lucas, do not raise Nonrecoverables on any delete operation. Keep going on them all, cleaning up as much as you can.
#bombing would also bomb the deletion of the rest of the blueprint
- ctx.logger.info("Undeploying CDAP application")
-
+ ctx.logger.info("Undeploying CDAP application")
+
try: #deregister with the broker, which will also take down the service from consul
discovery.delete_on_broker(ctx.instance.runtime_properties[SELECTED_BROKER],
ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
@@ -190,8 +222,8 @@ def app_config_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via app_config")
- discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
config = new_config_template, #This keyname will likely change per policy handler
reconfiguration_type = "program-flowlet-app-config",
logger = ctx.logger)
@@ -205,8 +237,8 @@ def app_preferences_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via app_preferences")
- discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
config = new_config_template, #This keyname will likely change per policy handler
reconfiguration_type = "program-flowlet-app-preferences",
logger = ctx.logger)
@@ -220,8 +252,8 @@ def app_smart_reconfigure(new_config_template, **kwargs):
"""
try:
ctx.logger.info("Reconfiguring CDAP application via smart interface")
- discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
- service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
+ discovery.reconfigure_in_broker(cdap_broker_name = ctx.instance.runtime_properties[SELECTED_BROKER],
+ service_component_name = ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME],
config = new_config_template, #This keyname will likely change per policy handler
reconfiguration_type = "program-flowlet-smart",
logger = ctx.logger)
@@ -229,6 +261,18 @@ def app_smart_reconfigure(new_config_template, **kwargs):
raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
@operation
+@Policies.update_policies_on_node(configs_only=True)
+def policy_update(updated_policies, **kwargs):
+ #its already develiered through policy
+ ctx.logger.info("Policy update recieved. updated policies: {0}".format(updated_policies))
+ try:
+ #TODO! In the future, if we really have many different policies, would be more efficient to do a single merge here.
+ #However all use cases today are a single policy so OK with this for loop for now.
+ _trigger_update(updated_policies)
+ except Exception as e:
+ raise NonRecoverableError("CDAP Reconfigure error: {0}".format(e))
+
+@operation
def delete_all_registered_apps(connected_broker_dns_name, **kwargs):
"""
Used in the cdap broker deleter node.