diff options
Diffstat (limited to 'policy-management/src/main')
6 files changed, 596 insertions, 776 deletions
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java index 733a492d..a4c546f8 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java @@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory { protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties, List<? extends Topic> topicEntities) { - String propertyTopicEntityPrefix; - List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>(); if (topicEntities == null || topicEntities.isEmpty()) { @@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory { for (Topic topic : topicEntities) { - /* source or sink ? ueb or dmaap? */ - boolean isSource = topic instanceof TopicSource; - CommInfrastructure commInfra = topic.getTopicCommInfrastructure(); - if (commInfra == CommInfrastructure.UEB) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."; - } - } else if (commInfra == CommInfrastructure.DMAAP) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."; - } - } else if (commInfra == CommInfrastructure.NOOP) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; - } - } else { - throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); - } - // 1. first the topic String firstTopic = topic.getTopic(); + String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic; + // 2. check if there is a custom decoder for this topic that the user prefers to use // instead of the ones provided in the platform - String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX); - - CustomGsonCoder customGsonCoder = null; - if (customGson != null && !customGson.isEmpty()) { - try { - customGsonCoder = new CustomGsonCoder(customGson); - } catch (IllegalArgumentException e) { - logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson, - e.getMessage(), e); - } - } + CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix); // 3. second the list of classes associated with each topic String eventClasses = properties - .getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX); + .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX); if (eventClasses == null || eventClasses.isEmpty()) { logger.warn("There are no event classes for topic {}", firstTopic); continue; } - List<PotentialCoderFilter> classes2Filters = new ArrayList<>(); + List<PotentialCoderFilter> classes2Filters = + getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses); - List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*"))); + TopicCoderFilterConfiguration topic2Classes2Filters = + new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder); + topics2DecodedClasses2Filters.add(topic2Classes2Filters); + } - for (String theClass : topicClasses) { + return topics2DecodedClasses2Filters; + } + private String getPropertyTopicPrefix(Topic topic) { + boolean isSource = topic instanceof TopicSource; + CommInfrastructure commInfra = topic.getTopicCommInfrastructure(); + if (commInfra == CommInfrastructure.UEB) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.DMAAP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.NOOP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; + } + } else { + throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); + } + } + + private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) { + String customGson = properties.getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX); + + CustomGsonCoder customGsonCoder = null; + if (customGson != null && !customGson.isEmpty()) { + try { + customGsonCoder = new CustomGsonCoder(customGson); + } catch (IllegalArgumentException e) { + logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson, + e.getMessage(), e); + } + } + return customGsonCoder; + } - // 4. third, for each coder class, get the filter expression + private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix, + String eventClasses) { - String filter = properties - .getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX - + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX); + List<PotentialCoderFilter> classes2Filters = new ArrayList<>(); - JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter); - PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter); - classes2Filters.add(class2Filters); - } + List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*"))); - TopicCoderFilterConfiguration topic2Classes2Filters = - new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder); - topics2DecodedClasses2Filters.add(topic2Classes2Filters); + for (String theClass : topicClasses) { + + // 4. for each coder class, get the filter expression + + String filter = properties + .getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX + + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX); + + JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter); + PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter); + classes2Filters.add(class2Filters); } - return topics2DecodedClasses2Filters; + return classes2Filters; } @Override diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java index ca1f2283..77bfcf9f 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java @@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.gson.annotation.GsonJsonProperty; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.common.utils.services.OrderedServiceImpl; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; @@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController { logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion); - if (newGroupId == null || newGroupId.isEmpty()) { - throw new IllegalArgumentException("Missing maven group-id coordinate"); - } + validateText(newGroupId, "Missing maven group-id coordinate"); + validateText(newArtifactId, "Missing maven artifact-id coordinate"); + validateText(newVersion, "Missing maven version coordinate"); - if (newArtifactId == null || newArtifactId.isEmpty()) { - throw new IllegalArgumentException("Missing maven artifact-id coordinate"); - } - - if (newVersion == null || newVersion.isEmpty()) { - throw new IllegalArgumentException("Missing maven version coordinate"); - } - - if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID) - || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID) - || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) { - throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " - + newGroupId + ":" + newArtifactId + ":" - + newVersion); - } + validateHasBrain(newGroupId, newArtifactId, newVersion); if (newGroupId.equalsIgnoreCase(this.getGroupId()) && newArtifactId.equalsIgnoreCase(this.getArtifactId()) @@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController { return; } - if (!newGroupId.equalsIgnoreCase(this.getGroupId()) - || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { - throw new IllegalArgumentException( - "Group ID and Artifact ID maven coordinates must be identical for the upgrade: " - + newGroupId + ":" + newArtifactId + ":" - + newVersion + " vs. " + this); - } + validateNewVersion(newGroupId, newArtifactId, newVersion); /* upgrade */ String messages = this.policyContainer.updateToVersion(newVersion); @@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController { logger.info("UPDATE-TO-VERSION: completed {}", this); } + private void validateText(String text, String errorMessage) { + if (text == null || text.isEmpty()) { + throw new IllegalArgumentException(errorMessage); + } + } + + private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) { + if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID) + || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID) + || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) { + throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion); + } + } + + private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) { + if (!newGroupId.equalsIgnoreCase(this.getGroupId()) + || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { + throw new IllegalArgumentException( + "Group ID and Artifact ID maven coordinates must be identical for the upgrade: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion + " vs. " + this); + } + } + /** * initialize decoders for all the topics supported by this controller * Note this is critical to be done after the Policy Container is @@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController { for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) { String topic = coderConfig.getTopic(); - CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); - if (customGsonCoder != null - && customGsonCoder.getClassContainer() != null - && !customGsonCoder.getClassContainer().isEmpty()) { - - String customGsonCoderClass = customGsonCoder.getClassContainer(); - if (!isClass(customGsonCoderClass)) { - throw makeRetrieveEx(customGsonCoderClass); - } else { - logClassFetched(customGsonCoderClass); - } - } + CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig); List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters(); if (coderFilters == null || coderFilters.isEmpty()) { @@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController { } } + private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) { + CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); + if (customGsonCoder != null + && customGsonCoder.getClassContainer() != null + && !customGsonCoder.getClassContainer().isEmpty()) { + + String customGsonCoderClass = customGsonCoder.getClassContainer(); + if (!isClass(customGsonCoderClass)) { + throw makeRetrieveEx(customGsonCoderClass); + } else { + logClassFetched(customGsonCoderClass); + } + } + return customGsonCoder; + } + /** * Logs an error and makes an exception for an item that cannot be retrieved. * @param itemName the item to retrieve @@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController { // Broadcast - for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { - try { - if (feature.beforeInsert(this, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getDroolsProviders().getList(), + feature -> feature.beforeInsert(this, event), + (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } boolean successInject = this.policyContainer.insertAll(event); @@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController { logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames()); } - for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { - try { - if (feature.afterInsert(this, event, successInject)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } - } + FeatureApiUtils.apply(getDroolsProviders().getList(), + feature -> feature.afterInsert(this, event, successInject), + (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return true; @@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController { PolicySession session = getSession(sessionName); KieSession kieSession = session.getKieSession(); - boolean found = false; - for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { - for (Query q : kiePackage.getQueries()) { - if (q.getName() != null && q.getName().equals(queryName)) { - found = true; - break; - } - } - } - if (!found) { - throw new IllegalArgumentException("Invalid Query Name: " + queryName); - } + validateQueryName(kieSession, queryName); List<Object> factObjects = new ArrayList<>(); @@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController { return factObjects; } + private void validateQueryName(KieSession kieSession, String queryName) { + for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { + for (Query q : kiePackage.getQueries()) { + if (q.getName() != null && q.getName().equals(queryName)) { + return; + } + } + } + + throw new IllegalArgumentException("Invalid Query Name: " + queryName); + } + @Override public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) { String factClassName = fact.getClass().getName(); diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java index 89a7a420..cb4ce07e 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java +++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java @@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder { coders.put(key, coderTools); - if (reverseCoders.containsKey(reverseKey)) { - // There is another controller (different group id/artifact id/topic) - // that shares the class and the topic. - - List<ProtocolCoderToolset> toolsets = - reverseCoders.get(reverseKey); - boolean present = false; - for (ProtocolCoderToolset parserSet : toolsets) { - // just doublecheck - present = parserSet.getControllerId().equals(key); - if (present) { - /* anomaly */ - logger.error( - "{}: unexpected toolset reverse mapping found for {}:{}: {}", - this, - reverseKey, - key, - parserSet); - } - } + addReverseCoder(coderTools, key, reverseKey); + } + } + private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) { + if (reverseCoders.containsKey(reverseKey)) { + // There is another controller (different group id/artifact id/topic) + // that shares the class and the topic. + + List<ProtocolCoderToolset> toolsets = + reverseCoders.get(reverseKey); + boolean present = false; + for (ProtocolCoderToolset parserSet : toolsets) { + // just doublecheck + present = parserSet.getControllerId().equals(key); if (present) { - return; - } else { - logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools); - toolsets.add(coderTools); + /* anomaly */ + logger.error( + "{}: unexpected toolset reverse mapping found for {}:{}: {}", + this, + reverseKey, + key, + parserSet); } + } + + if (present) { + return; } else { - List<ProtocolCoderToolset> toolsets = new ArrayList<>(); + logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools); toolsets.add(coderTools); - - logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets); - reverseCoders.put(reverseKey, toolsets); } + } else { + List<ProtocolCoderToolset> toolsets = new ArrayList<>(); + toolsets.add(coderTools); + + logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets); + reverseCoders.put(reverseKey, toolsets); } } @@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder { for (CoderFilters codeFilter : coderToolset.getCoders()) { String className = codeFilter.getCodedClass(); String reverseKey = this.reverseCodersKey(topic, className); - if (this.reverseCoders.containsKey(reverseKey)) { - List<ProtocolCoderToolset> toolsets = - this.reverseCoders.get(reverseKey); - Iterator<ProtocolCoderToolset> toolsetsIter = - toolsets.iterator(); - while (toolsetsIter.hasNext()) { - ProtocolCoderToolset toolset = toolsetsIter.next(); - if (toolset.getControllerId().equals(key)) { - logger.info( - "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey); - toolsetsIter.remove(); - } - } - - if (this.reverseCoders.get(reverseKey).isEmpty()) { - logger.info("{}: removing reverse mapping for {}: ", this, reverseKey); - this.reverseCoders.remove(reverseKey); - } - } + removeReverseCoder(key, reverseKey); } } } } + private void removeReverseCoder(String key, String reverseKey) { + if (!this.reverseCoders.containsKey(reverseKey)) { + return; + } + + List<ProtocolCoderToolset> toolsets = + this.reverseCoders.get(reverseKey); + Iterator<ProtocolCoderToolset> toolsetsIter = + toolsets.iterator(); + while (toolsetsIter.hasNext()) { + ProtocolCoderToolset toolset = toolsetsIter.next(); + if (toolset.getControllerId().equals(key)) { + logger.info( + "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey); + toolsetsIter.remove(); + } + } + + if (this.reverseCoders.get(reverseKey).isEmpty()) { + logger.info("{}: removing reverse mapping for {}: ", this, reverseKey); + this.reverseCoders.remove(reverseKey); + } + } + /** * does it support coding. * @@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder { } for (ProtocolCoderToolset encoderSet : toolsets) { - // figure out the right toolset - String groupId = encoderSet.getGroupId(); - String artifactId = encoderSet.getArtifactId(); - List<CoderFilters> coderFilters = encoderSet.getCoders(); - for (CoderFilters coder : coderFilters) { - if (coder.getCodedClass().equals(encodedClass.getClass().getName())) { - DroolsController droolsController = - DroolsControllerConstants.getFactory().get(groupId, artifactId, ""); - if (droolsController.ownsCoder( - encodedClass.getClass(), coder.getModelClassLoaderHash())) { - droolsControllers.add(droolsController); - } - } - } + addToolsetControllers(droolsControllers, encodedClass, encoderSet); } if (droolsControllers.isEmpty()) { @@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder { return droolsControllers; } + private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass, + ProtocolCoderToolset encoderSet) { + // figure out the right toolset + String groupId = encoderSet.getGroupId(); + String artifactId = encoderSet.getArtifactId(); + List<CoderFilters> coderFilters = encoderSet.getCoders(); + for (CoderFilters coder : coderFilters) { + if (coder.getCodedClass().equals(encodedClass.getClass().getName())) { + DroolsController droolsController = + DroolsControllerConstants.getFactory().get(groupId, artifactId, ""); + if (droolsController.ownsCoder( + encodedClass.getClass(), coder.getModelClassLoaderHash())) { + droolsControllers.add(droolsController); + } + } + } + } + /** * get all filters by maven coordinates and topic. * diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java index 17247f41..82cd015e 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java @@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable { /** * Get the topic readers of interest for this controller. */ - List<? extends TopicSource> getTopicSources(); + List<TopicSource> getTopicSources(); /** * Get the topic readers of interest for this controller. */ - List<? extends TopicSink> getTopicSinks(); + List<TopicSink> getTopicSinks(); /** * Get the Drools Controller. diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 36d8ca59..32e3f674 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; +import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Stream; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.gson.annotation.GsonJsonProperty; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; @@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine { /** * Is the Policy Engine running. */ + @Getter private volatile boolean alive = false; /** * Is the engine locked. */ + @Getter private volatile boolean locked = false; /** @@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine { /** * Policy Engine Sources. */ - private List<? extends TopicSource> sources = new ArrayList<>(); + @Getter + private List<TopicSource> sources = new ArrayList<>(); /** * Policy Engine Sinks. */ - private List<? extends TopicSink> sinks = new ArrayList<>(); + @Getter + private List<TopicSink> sinks = new ArrayList<>(); /** * Policy Engine HTTP Servers. */ + @Getter private List<HttpServletServer> httpServers = new ArrayList<>(); /** @@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine { @Override public synchronized void boot(String[] cliArgs) { - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeBoot(this, cliArgs)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeBoot(this, cliArgs), + (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } try { @@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e); } - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterBoot(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterBoot(this), + (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch pre configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeConfigure(this, properties)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-configure failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeConfigure(this, properties), + (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.properties = properties; @@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterConfigure(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterConfigure(this), + (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine { } // feature hook - for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) { - try { - if (controllerFeature.afterCreate(controller)) { - return controller; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-controller-create failure because of {}", this, - controllerFeature.getClass().getName(), e.getMessage(), e); - } - } + PolicyController controller2 = controller; + FeatureApiUtils.apply(getControllerProviders(), + feature -> feature.afterCreate(controller2), + (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); return controller; } @@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("No controller configuration has been provided"); } - PolicyController policyController = null; try { final String operation = configController.getOperation(); if (operation == null || operation.isEmpty()) { @@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("operation must be provided"); } - try { - policyController = getControllerFactory().get(controllerName); - } catch (final IllegalArgumentException e) { - // not found - logger.warn("Policy Controller " + controllerName + " not found", e); - } - + PolicyController policyController = getController(controllerName); if (policyController == null) { - - if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) - || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { - throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); - } - - /* Recovery case */ - - logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); - - final Properties controllerProperties = - getPersistenceManager().getControllerProperties(controllerName); - - /* - * returned properties cannot be null (per implementation) assert (properties != - * null) - */ - - if (controllerProperties == null) { - throw new IllegalArgumentException(controllerName + " is invalid"); - } - - logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", - controllerName); - - /* - * try to bring up bad controller in brainless mode, after having it - * working, apply the new create/update operation. - */ - controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, - DroolsControllerConstants.NO_GROUP_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, - DroolsControllerConstants.NO_ARTIFACT_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, - DroolsControllerConstants.NO_VERSION); - - policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties); + policyController = findController(controllerName, operation); /* fall through to do brain update operation */ } - switch (operation) { - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: - policyController.unlock(); - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: - policyController.lock(); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: - policyController.unlock(); - break; - default: - final String msg = "Controller Operation Configuration is not supported: " + operation + " for " - + controllerName; - logger.warn(msg); - throw new IllegalArgumentException(msg); - } + updateController(controllerName, policyController, operation, configController); return policyController; } catch (final Exception e) { @@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine { } } + private PolicyController getController(final String controllerName) { + PolicyController policyController = null; + try { + policyController = getControllerFactory().get(controllerName); + } catch (final IllegalArgumentException e) { + // not found + logger.warn("Policy Controller " + controllerName + " not found", e); + } + return policyController; + } + + private PolicyController findController(final String controllerName, final String operation) { + if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) + || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { + throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); + } + + /* Recovery case */ + + logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); + + final Properties controllerProperties = + getPersistenceManager().getControllerProperties(controllerName); + + /* + * returned properties cannot be null (per implementation) assert (properties != + * null) + */ + + if (controllerProperties == null) { + throw new IllegalArgumentException(controllerName + " is invalid"); + } + + logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", + controllerName); + + /* + * try to bring up bad controller in brainless mode, after having it + * working, apply the new create/update operation. + */ + controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, + DroolsControllerConstants.NO_GROUP_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, + DroolsControllerConstants.NO_ARTIFACT_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, + DroolsControllerConstants.NO_VERSION); + + return getPolicyEngine().createPolicyController(controllerName, controllerProperties); + } + + private void updateController(final String controllerName, PolicyController policyController, + final String operation, ControllerConfiguration configController) { + switch (operation) { + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: + policyController.unlock(); + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: + policyController.lock(); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: + policyController.unlock(); + break; + default: + final String msg = "Controller Operation Configuration is not supported: " + operation + " for " + + controllerName; + logger.warn(msg); + throw new IllegalArgumentException(msg); + } + } + @Override public synchronized boolean start() { /* policy-engine dispatch pre start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStart(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStart(this), + (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } - boolean success = true; if (this.locked) { throw new IllegalStateException(ENGINE_LOCKED_MSG); } this.alive = true; + AtomicReference<Boolean> success = new AtomicReference<>(true); + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.waitedStart(10 * 1000L)) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + httpServer -> httpServer.waitedStart(10 * 1000L), + (item, ex) -> logger.error("{}: cannot start http-server {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine exclusively-owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::start, + (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::start, + (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Controllers */ - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(), - e); - success = false; - } - } + attempt(success, getControllerFactory().inventory(), + PolicyController::start, + (item, ex) -> { + logger.error("{}: cannot start policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Start managed Topic Endpoints */ try { if (!getTopicEndpointManager().start()) { - success = false; + success.set(false); } } catch (final IllegalStateException e) { logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e); @@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine { startPdpJmxListener(); /* policy-engine dispatch after start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStart(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStart(this), + (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); + } + + @FunctionalInterface + private static interface PredicateWithEx<T> { + public boolean test(T value) throws InterruptedException; } @Override public synchronized boolean stop() { /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStop(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStop(this), + (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } /* stop regardless of the lock state */ - boolean success = true; if (!this.alive) { return true; } this.alive = false; - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e); - success = false; - } - } + AtomicReference<Boolean> success = new AtomicReference<>(true); + + attempt(success, getControllerFactory().inventory(), + PolicyController::stop, + (item, ex) -> { + logger.error("{}: cannot stop policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Stop Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::stop, + (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Stop Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::stop, + (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* stop all managed topics sources and sinks */ if (!getTopicEndpointManager().stop()) { - success = false; + success.set(false); } /* stop all unmanaged http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + HttpServletServer::stop, + (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, + ex.getMessage(), ex)); // stop JMX? /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStop(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStop(this), + (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); } @Override @@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine { exitThread.start(); /* policy-engine dispatch pre shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeShutdown(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeShutdown(this), + (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.alive = false; /* Shutdown Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - source.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + applyAll(this.sources, + TopicSource::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - sink.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + applyAll(this.sinks, + TopicSink::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown managed resources */ getControllerFactory().shutdown(); @@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine { stopPdpJmxListener(); /* policy-engine dispatch post shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterShutdown(this), + (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); + + exitThread.interrupt(); + logger.info("{}: normal termination", this); + } + + private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { try { - if (feature.afterShutdown(this)) { - return; + if (!pred.test(item)) { + success.set(false); } - } catch (final Exception e) { - logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); + + } catch (InterruptedException ex) { + handleEx.accept(item, ex); + Thread.currentThread().interrupt(); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); } } + } - exitThread.interrupt(); - logger.info("{}: normal termination", this); + private <T> void applyAll(List<T> items, Consumer<T> function, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { + try { + function.accept(item); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); + } + } } /** @@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine { /* * shut down the Policy Engine owned http servers as the very last thing */ - for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) { - try { - httpServer.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this, - httpServer, e.getMessage(), e); - } - } + applyAll(PolicyEngineManager.this.getHttpServers(), + HttpServletServer::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item, + ex.getMessage(), ex)); logger.info("{}: exit", PolicyEngineManager.this); doExit(0); @@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine { } @Override - public boolean isAlive() { - return this.alive; - } - - @Override public synchronized boolean lock() { /* policy-engine dispatch pre lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeLock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeLock(this), + (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (this.locked) { @@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; /* policy-engine dispatch post lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterLock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterLock(this), + (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized boolean unlock() { /* policy-engine dispatch pre unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeUnlock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeUnlock(this), + (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (!this.locked) { @@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().unlock() && success; /* policy-engine dispatch after unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterUnlock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterUnlock(this), + (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @Override - public boolean isLocked() { - return this.locked; - } - - @Override public void removePolicyController(String name) { getControllerFactory().destroy(name); } @@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine { return this.properties; } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSource> getSources() { - return (List<TopicSource>) this.sources; - } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSink> getSinks() { - return (List<TopicSink>) this.sinks; - } - - @Override - public List<HttpServletServer> getHttpServers() { - return this.httpServers; - } - @Override public List<String> getFeatures() { final List<String> features = new ArrayList<>(); @@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine { @Override public void onTopicEvent(CommInfrastructure commType, String topic, String event) { /* policy-engine pre topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.beforeOnTopicEvent(this, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } + if (FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.beforeOnTopicEvent(this, commType, topic, event), + (feature, ex) -> logger.error( + "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex))) { + return; } /* configuration request */ @@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine after topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } - } + PdpdConfiguration configuration2 = configuration; + FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event), + (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex)); } @Override @@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalStateException(ENGINE_LOCKED_MSG); } - final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); + final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); if (topicSinks == null || topicSinks.size() != 1) { throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks); } @@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine { * Note this entry point is usually from the DRL (one of the reasons busType is String. */ - if (busType == null || busType.isEmpty()) { + if (StringUtils.isBlank(busType)) { throw new IllegalArgumentException("Invalid Communication Infrastructure"); } - if (topic == null || topic.isEmpty()) { + if (StringUtils.isBlank(topic)) { throw new IllegalArgumentException(INVALID_TOPIC_MSG); } @@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException(INVALID_EVENT_MSG); } - boolean valid = false; - for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) { - if (comm.name().equals(busType)) { - valid = true; - } - } + boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name) + .anyMatch(name -> name.equals(busType)); if (!valid) { throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType); @@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized void activate() { /* policy-engine dispatch pre activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeActivate(this), + (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } // activate 'policy-management' @@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine { this.unlock(); /* policy-engine dispatch post activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterActivate(this), + (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override public synchronized void deactivate() { /* policy-engine dispatch pre deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.lock(); @@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } private boolean controllerConfig(PdpdConfiguration config) { @@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine { } final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers()); - boolean success = false; - if (!(policyControllers == null || policyControllers.isEmpty()) - && (policyControllers.size() == configControllers.size())) { - success = true; - } - return success; + return (policyControllers != null && !policyControllers.isEmpty() + && policyControllers.size() == configControllers.size()); } @Override diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java index 5d915104..aa57abaf 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java @@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.controller.DroolsControllerFactory; @@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory; */ public class AggregatedPolicyController implements PolicyController, TopicListener { + private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}"; + private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}"; + /** * Logger. */ @@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen /** * Abstracted Event Sources List regardless communication technology. */ - private final List<? extends TopicSource> sources; + private final List<TopicSource> sources; /** * Abstracted Event Sinks List regardless communication technology. */ - private final List<? extends TopicSink> sinks; + private final List<TopicSink> sinks; /** * Mapping topics to sinks. @@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean start() { logger.info("{}: start", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeStart(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeStart(this), + (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (this.isLocked()) { @@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen } } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterStart(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterStart(this), + (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean stop() { logger.info("{}: stop", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeStop(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeStop(this), + (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } /* stop regardless locked state */ @@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.stop(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterStop(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterStop(this), + (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public void shutdown() { logger.info("{}: shutdown", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeShutdown(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeShutdown(this), + (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.stop(); getDroolsFactory().shutdown(this.droolsController); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterShutdown(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterShutdown(this), + (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } /** @@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public void halt() { logger.info("{}: halt", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeHalt(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeHalt(this), + (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.stop(); getDroolsFactory().destroy(this.droolsController); getPersistenceManager().deleteController(this.name); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterHalt(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterHalt(this), + (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } /** @@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen return; } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeOffer(this, commType, topic, event)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeOffer(this, commType, topic, event), + (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } boolean success = this.droolsController.offer(topic, event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterOffer(this, commType, topic, event, success)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterOffer(this, commType, topic, event, success), + (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen return true; } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeOffer(this, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeOffer(this, event), + (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } boolean success = this.droolsController.offer(event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterOffer(this, event, success)) { - return success; - } - } catch (Exception e) { - logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterOffer(this, event, success), + (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeDeliver(this, commType, topic, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeDeliver(this, commType, topic, event), + (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (topic == null || topic.isEmpty()) { @@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterDeliver(this, commType, topic, event, success)) { - return success; - } - } catch (Exception e) { - logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterDeliver(this, commType, topic, event, success), + (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean lock() { logger.info("{}: lock", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeLock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeLock(this), + (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } synchronized (this) { @@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.lock(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterLock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterLock(this), + (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen logger.info("{}: unlock", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeUnlock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeUnlock(this), + (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } synchronized (this) { @@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.unlock(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterUnlock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterUnlock(this), + (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen * {@inheritDoc}. */ @Override - public List<? extends TopicSource> getTopicSources() { + public List<TopicSource> getTopicSources() { return this.sources; } @@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen * {@inheritDoc}. */ @Override - public List<? extends TopicSink> getTopicSinks() { + public List<TopicSink> getTopicSinks() { return this.sinks; } |