aboutsummaryrefslogtreecommitdiffstats
path: root/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java
diff options
context:
space:
mode:
Diffstat (limited to 'controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java')
-rw-r--r--controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java668
1 files changed, 668 insertions, 0 deletions
diff --git a/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java
new file mode 100644
index 000000000..d876bee96
--- /dev/null
+++ b/controlloop/common/controller-tdjam/src/main/java/org/onap/policy/extension/system/NonDroolsPolicyController.java
@@ -0,0 +1,668 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.extension.system;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.utils.services.OrderedServiceImpl;
+import org.onap.policy.drools.controller.DroolsController;
+import org.onap.policy.drools.core.PolicyContainer;
+import org.onap.policy.drools.features.DroolsControllerFeatureApi;
+import org.onap.policy.drools.features.DroolsControllerFeatureApiConstants;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
+import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
+import org.onap.policy.drools.protocol.coders.EventProtocolParams;
+import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
+import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
+import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
+import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
+import org.onap.policy.drools.system.internal.AggregatedPolicyController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class combines the 'PolicyController' and 'DroolsController'
+ * interfaces, and provides a controller that does not have Drools running
+ * underneath. It also contains some code copied from 'MavenDroolsController'
+ * and 'NullDroolsController'. The goal is to have it look like other
+ * controllers, use the same style property file, and provide access to
+ * UEB/DMAAP message streams associated with the controller.
+ */
+public class NonDroolsPolicyController extends AggregatedPolicyController implements DroolsController {
+ /**
+ * Logger.
+ */
+ private static final Logger logger = LoggerFactory.getLogger(NonDroolsPolicyController.class);
+
+ /**
+ * The PolicyController and DroolsController factories assume that the
+ * controllers are separate objects, but in this case, the same object
+ * is used for both. We want the DroolsController 'build' method to
+ * return the same object; however, at the point the DroolsController
+ * build is taking place, the PolicyController hasn't yet been placed
+ * in any tables. The following variable is used to pass this information
+ * from one stack frame to another within the same thread.
+ */
+ private static ThreadLocal<NonDroolsPolicyController> buildInProgress = new ThreadLocal<>();
+
+ /**
+ * alive status of this drools controller,
+ * reflects invocation of start()/stop() only.
+ */
+ protected volatile boolean alive = false;
+
+ /**
+ * locked status of this drools controller,
+ * reflects if i/o drools related operations are permitted,
+ * more specifically: offer() and deliver().
+ * It does not affect the ability to start and stop
+ * underlying drools infrastructure
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * list of topics, each with associated decoder classes, each
+ * with a list of associated filters.
+ */
+ protected List<TopicCoderFilterConfiguration> decoderConfigurations;
+
+ /**
+ * list of topics, each with associated encoder classes, each
+ * with a list of associated filters.
+ */
+ protected List<TopicCoderFilterConfiguration> encoderConfigurations;
+
+ /**
+ * recent sink events processed.
+ */
+ protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
+
+ // this is used to avoid infinite recursion in a shutdown or halt operation
+ private boolean shutdownInProgress = false;
+
+ private static Properties convert(String name, Properties properties) {
+
+ Properties newProperties = new Properties();
+ for (String pname : properties.stringPropertyNames()) {
+ newProperties.setProperty(pname, properties.getProperty(pname));
+ }
+
+ newProperties.setProperty("rules.groupId", "NonDroolsPolicyController");
+ newProperties.setProperty("rules.artifactId", name);
+ newProperties.setProperty("rules.version", "1.0");
+ return newProperties;
+ }
+
+ /**
+ * constructor -- pass parameters to superclass.
+ * @param name controller name
+ * @param properties contents of controller properties file
+ */
+ public NonDroolsPolicyController(String name, Properties properties) {
+ super(name, convert(name, properties));
+ }
+
+ /**
+ * This is used to pass the 'NonDroolsPolicyController' object to the
+ * 'DroolsPolicyBuilder' object, as the same object is used for both
+ * 'PolicyController' and 'DroolsController'.
+ *
+ * @return the NonDroolsPolicyController object ('null' if not available)
+ */
+ public static NonDroolsPolicyController getBuildInProgress() {
+ return buildInProgress.get();
+ }
+
+ protected void initDrools(Properties properties) {
+ try {
+ // Register with drools factory
+ buildInProgress.set(this);
+ this.droolsController.set(getDroolsFactory().build(properties, sources, sinks));
+ buildInProgress.set(null);
+ } catch (Exception | LinkageError e) {
+ logger.error("{}: cannot init-drools", this);
+ throw new IllegalArgumentException(e);
+ }
+
+ decoderConfigurations = codersAndFilters(properties, sources);
+ encoderConfigurations = codersAndFilters(properties, sinks);
+
+ // add to 'EventProtocolCoderConstants.getManager()' table
+ for (TopicCoderFilterConfiguration tcfc : decoderConfigurations) {
+ for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) {
+ getCoderManager().addDecoder(
+ EventProtocolParams.builder()
+ .groupId(getGroupId())
+ .artifactId(getArtifactId())
+ .topic(tcfc.getTopic())
+ .eventClass(pcf.getCodedClass())
+ .protocolFilter(pcf.getFilter())
+ .customGsonCoder(tcfc.getCustomGsonCoder())
+ .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode()));
+ }
+ }
+ for (TopicCoderFilterConfiguration tcfc : encoderConfigurations) {
+ for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) {
+ getCoderManager().addEncoder(
+ EventProtocolParams.builder()
+ .groupId(getGroupId())
+ .artifactId(getArtifactId())
+ .topic(tcfc.getTopic())
+ .eventClass(pcf.getCodedClass())
+ .protocolFilter(pcf.getFilter())
+ .customGsonCoder(tcfc.getCustomGsonCoder())
+ .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode()));
+ }
+ }
+ }
+
+ /*==============================*/
+ /* 'DroolsController' interface */
+ /*==============================*/
+
+ // methods copied from 'MavenDroolsController' and 'NullDroolsController'
+
+ @Override
+ public boolean start() {
+
+ logger.info("START: {}", this);
+
+ synchronized (this) {
+ if (this.alive) {
+ return true;
+ }
+ this.alive = true;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+
+ logger.info("STOP: {}", this);
+
+ synchronized (this) {
+ if (!this.alive) {
+ return true;
+ }
+ this.alive = false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ if (shutdownInProgress) {
+ // avoid infinite recursion
+ return;
+ }
+ logger.info("{}: SHUTDOWN", this);
+
+ try {
+ this.stop();
+ this.removeCoders();
+ shutdownInProgress = true;
+
+ // the following method calls 'this.shutdown' recursively
+ getDroolsFactory().shutdown(this);
+ } catch (Exception e) {
+ logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
+ } finally {
+ shutdownInProgress = false;
+ }
+ }
+
+ @Override
+ public void halt() {
+ if (shutdownInProgress) {
+ // avoid infinite recursion
+ return;
+ }
+ logger.info("{}: HALT", this);
+
+ try {
+ this.stop();
+ this.removeCoders();
+ shutdownInProgress = true;
+
+ // the following method calls 'this.halt' recursively
+ getDroolsFactory().destroy(this);
+ } catch (Exception e) {
+ logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
+ } finally {
+ shutdownInProgress = false;
+ }
+ }
+
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ @Override
+ public boolean lock() {
+ logger.info("LOCK: {}", this);
+
+ this.locked = true;
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ logger.info("UNLOCK: {}", this);
+
+ this.locked = false;
+ return true;
+ }
+
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public String getGroupId() {
+ return "NonDroolsPolicyController";
+ }
+
+ @Override
+ public String getArtifactId() {
+ return getName();
+ }
+
+ @Override
+ public String getVersion() {
+ return "1.0";
+ }
+
+ @Override
+ public List<String> getSessionNames() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<String> getCanonicalSessionNames() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<String> getBaseDomainNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean offer(String topic, String event) {
+ return false;
+ }
+
+ @Override
+ public <T> boolean offer(T event) {
+ return false;
+ }
+
+ @Override
+ public boolean deliver(TopicSink sink, Object event) {
+
+ // this one is from 'MavenDroolsController'
+
+ logger.info("{} DELIVER: {} FROM {} TO {}", this, event, this, sink);
+
+ for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
+ try {
+ if (feature.beforeDeliver(this, sink, event)) {
+ return true;
+ }
+ } catch (Exception e) {
+ logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ if (sink == null) {
+ throw new IllegalArgumentException(this + " invalid sink");
+ }
+
+ if (event == null) {
+ throw new IllegalArgumentException(this + " invalid event");
+ }
+
+ if (this.locked) {
+ throw new IllegalStateException(this + " is locked");
+ }
+
+ if (!this.alive) {
+ throw new IllegalStateException(this + " is stopped");
+ }
+
+ String json =
+ getCoderManager().encode(sink.getTopic(), event, this);
+
+ synchronized (this.recentSinkEvents) {
+ this.recentSinkEvents.add(json);
+ }
+
+ boolean success = sink.send(json);
+
+ for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
+ try {
+ if (feature.afterDeliver(this, sink, event, json, success)) {
+ return true;
+ }
+ } catch (Exception e) {
+ logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
+ e.getMessage(), e);
+ }
+ }
+
+ return success;
+
+ }
+
+ @Override
+ public Object[] getRecentSourceEvents() {
+ return new String[0];
+ }
+
+ @Override
+ public PolicyContainer getContainer() {
+ return null;
+ }
+
+ @Override
+ public String[] getRecentSinkEvents() {
+ synchronized (this.recentSinkEvents) {
+ String[] events = new String[recentSinkEvents.size()];
+ return recentSinkEvents.toArray(events);
+ }
+ }
+
+ @Override
+ public boolean ownsCoder(Class<?> coderClass, int modelHash) {
+ //throw new IllegalStateException(makeInvokeMsg());
+ return true;
+ }
+
+ @Override
+ public Class<?> fetchModelClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException(makeInvokeMsg());
+ }
+ }
+
+ @Override
+ public boolean isBrained() {
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("NonDroolsPolicyController []");
+ return builder.toString();
+ }
+
+ @Override
+ public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
+ List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations)
+ throws LinkageError {
+ throw new IllegalStateException(makeInvokeMsg());
+ }
+
+ @Override
+ public Map<String, Integer> factClassNames(String sessionName) {
+ return new HashMap<>();
+ }
+
+ @Override
+ public long factCount(String sessionName) {
+ return 0;
+ }
+
+ @Override
+ public List<Object> facts(String sessionName, String className, boolean delete) {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public <T> List<T> facts(@NonNull String sessionName, @NonNull Class<T> clazz) {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<Object> factQuery(String sessionName, String queryName,
+ String queriedEntity,
+ boolean delete, Object... queryParams) {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
+ return false;
+ }
+
+ @Override
+ public <T> boolean delete(@NonNull T fact) {
+ return false;
+ }
+
+ @Override
+ public <T> boolean delete(@NonNull String sessionName, @NonNull Class<T> fact) {
+ return false;
+ }
+
+ @Override
+ public <T> boolean delete(@NonNull Class<T> fact) {
+ return false;
+ }
+
+ private String makeInvokeMsg() {
+ return this.getClass().getName() + " invoked";
+ }
+
+ /**
+ * remove decoders.
+ */
+ protected void removeDecoders() {
+ logger.info("REMOVE-DECODERS: {}", this);
+
+ if (this.decoderConfigurations == null) {
+ return;
+ }
+
+
+ for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
+ String topic = coderConfig.getTopic();
+ getCoderManager().removeDecoders(this.getGroupId(), this.getArtifactId(), topic);
+ }
+ }
+
+ /**
+ * remove decoders.
+ */
+ protected void removeEncoders() {
+
+ logger.info("REMOVE-ENCODERS: {}", this);
+
+ if (this.encoderConfigurations == null) {
+ return;
+ }
+
+ for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
+ String topic = coderConfig.getTopic();
+ getCoderManager().removeEncoders(this.getGroupId(), this.getArtifactId(), topic);
+ }
+ }
+
+ /**
+ * removes this drools controllers and encoders and decoders from operation.
+ */
+ protected void removeCoders() {
+ logger.info("{}: REMOVE-CODERS", this);
+
+ try {
+ this.removeDecoders();
+ } catch (IllegalArgumentException e) {
+ logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
+ }
+
+ try {
+ this.removeEncoders();
+ } catch (IllegalArgumentException e) {
+ logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
+ }
+ }
+
+ protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
+ List<? extends Topic> topicEntities) {
+
+ List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
+
+ if (topicEntities == null || topicEntities.isEmpty()) {
+ return topics2DecodedClasses2Filters;
+ }
+
+ for (Topic topic : topicEntities) {
+
+ // 1. first the topic
+
+ String firstTopic = topic.getTopic();
+
+ String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
+
+ // 2. check if there is a custom decoder for this topic that the user prefers to use
+ // instead of the ones provided in the platform
+
+ CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
+
+ // 3. second the list of classes associated with each topic
+
+ String eventClasses = properties
+ .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+
+ if (eventClasses == null || eventClasses.isEmpty()) {
+ logger.warn("There are no event classes for topic {}", firstTopic);
+ continue;
+ }
+
+ List<PotentialCoderFilter> classes2Filters =
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
+
+ TopicCoderFilterConfiguration topic2Classes2Filters =
+ new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
+ topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ }
+
+ return topics2DecodedClasses2Filters;
+ }
+
+ private String getPropertyTopicPrefix(Topic topic) {
+ boolean isSource = topic instanceof TopicSource;
+ CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
+ if (commInfra == CommInfrastructure.UEB) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.DMAAP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.NOOP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
+ }
+ }
+
+ private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
+ String customGson = properties.getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+
+ CustomGsonCoder customGsonCoder = null;
+ if (customGson != null && !customGson.isEmpty()) {
+ try {
+ customGsonCoder = new CustomGsonCoder(customGson);
+ } catch (IllegalArgumentException e) {
+ logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
+ e.getMessage(), e);
+ }
+ }
+ return customGsonCoder;
+ }
+
+ private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
+ String eventClasses) {
+
+ List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
+
+ List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
+
+ for (String theClass : topicClasses) {
+
+ // 4. for each coder class, get the filter expression
+
+ String filter = properties
+ .getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+
+ JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
+ PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
+ classes2Filters.add(class2Filters);
+ }
+
+ return classes2Filters;
+ }
+
+ // these may be overridden by junit tests
+
+ protected EventProtocolCoder getCoderManager() {
+ return EventProtocolCoderConstants.getManager();
+ }
+
+ protected OrderedServiceImpl<DroolsControllerFeatureApi> getDroolsProviders() {
+ return DroolsControllerFeatureApiConstants.getProviders();
+ }
+}