aboutsummaryrefslogtreecommitdiffstats
path: root/policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java')
-rw-r--r--policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java718
1 files changed, 718 insertions, 0 deletions
diff --git a/policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java
new file mode 100644
index 00000000..2c5708d3
--- /dev/null
+++ b/policy-management/src/main/java/org/openecomp/policy/drools/controller/internal/MavenDroolsController.java
@@ -0,0 +1,718 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.controller.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.controller.DroolsController;
+import org.openecomp.policy.drools.core.PolicyContainer;
+import org.openecomp.policy.drools.core.PolicySession;
+import org.openecomp.policy.drools.core.jmx.PdpJmx;
+import org.openecomp.policy.drools.event.comm.TopicSink;
+import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
+import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
+import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
+import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
+import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
+import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
+import org.openecomp.policy.drools.utils.ReflectionUtil;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Maven-based Drools Controller that interacts with the
+ * policy-core PolicyContainer and PolicySession to manage
+ * Drools containers instantiated using Maven.
+ */
+public class MavenDroolsController implements DroolsController {
+
+ /**
+ * logger
+ */
+ private static Logger logger = FlexLogger.getLogger(MavenDroolsController.class);
+
+ /**
+ * Policy Container, the access object to the policy-core layer
+ */
+ @JsonIgnore
+ protected final PolicyContainer policyContainer;
+
+ /**
+ * alive status of this drools controller,
+ * reflects invocation of start()/stop() only
+ */
+ protected volatile boolean alive = false;
+
+ /**
+ * locked status of this drools controller,
+ * reflects if i/o drools related operations are permitted,
+ * more specifically: offer() and deliver().
+ * It does not affect the ability to start and stop
+ * underlying drools infrastructure
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * list of topics, each with associated decoder classes, each
+ * with a list of associated filters.
+ */
+ protected List<TopicCoderFilterConfiguration> decoderConfigurations;
+
+ /**
+ * list of topics, each with associated encoder classes, each
+ * with a list of associated filters.
+ */
+ protected List<TopicCoderFilterConfiguration> encoderConfigurations;
+
+ /**
+ * recent source events processed
+ */
+ protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
+
+ /**
+ * recent sink events processed
+ */
+ protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
+
+ /**
+ * original Drools Model/Rules classloader hash
+ */
+ protected int modelClassLoaderHash;
+
+ /**
+ * Expanded version of the constructor
+ *
+ * @param groupId maven group id
+ * @param artifactId maven artifact id
+ * @param version maven version
+ * @param decoderConfiguration list of topic -> decoders -> filters mapping
+ * @param encoderConfiguration list of topic -> encoders -> filters mapping
+ *
+ * @throws IllegalArgumentException invalid arguments passed in
+ */
+ public MavenDroolsController(String groupId,
+ String artifactId,
+ String version,
+ List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations)
+ throws IllegalArgumentException {
+
+ if (logger.isInfoEnabled())
+ logger.info("DROOLS CONTROLLER: instantiation " + this +
+ " -> {" + groupId + ":" + artifactId + ":" + version + "}");
+
+ if (groupId == null || artifactId == null || version == null ||
+ groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
+ throw new IllegalArgumentException("Missing maven coordinates: " +
+ groupId + ":" + artifactId + ":" +
+ version);
+ }
+
+ this.policyContainer= new PolicyContainer(groupId, artifactId, version);
+ this.init(decoderConfigurations, encoderConfigurations);
+
+ if (logger.isInfoEnabled())
+ logger.info("DROOLS CONTROLLER: instantiation completed " + this);
+ }
+
+ /**
+ * init encoding/decoding configuration
+ * @param decoderConfiguration list of topic -> decoders -> filters mapping
+ * @param encoderConfiguration list of topic -> encoders -> filters mapping
+ */
+ protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations) {
+
+ this.decoderConfigurations = decoderConfigurations;
+ this.encoderConfigurations = encoderConfigurations;
+
+ this.initCoders(decoderConfigurations, true);
+ this.initCoders(encoderConfigurations, false);
+
+ this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
+ List<TopicCoderFilterConfiguration> decoderConfigurations,
+ List<TopicCoderFilterConfiguration> encoderConfigurations)
+ throws IllegalArgumentException, LinkageError, Exception {
+
+ if (logger.isInfoEnabled())
+ logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
+
+ if (newGroupId == null || newArtifactId == null || newVersion == null ||
+ newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
+ throw new IllegalArgumentException("Missing maven coordinates: " +
+ newGroupId + ":" + newArtifactId + ":" +
+ newVersion);
+ }
+
+ if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
+ newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
+ newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
+ throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
+ newGroupId + ":" + newArtifactId + ":" +
+ newVersion);
+ }
+
+ if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
+ newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
+ newVersion.equalsIgnoreCase(this.getVersion())) {
+ logger.warn("Al in the right version: " + newGroupId + ":" +
+ newArtifactId + ":" + newVersion + " vs. " + this);
+ return;
+ }
+
+ if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
+ !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
+ throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
+ newGroupId + ":" + newArtifactId + ":" +
+ newVersion + " vs. " + this);
+ }
+
+ /* upgrade */
+ String messages = this.policyContainer.updateToVersion(newVersion);
+ if (logger.isWarnEnabled())
+ logger.warn(this + "UPGRADE results: " + messages);
+
+ /*
+ * If all sucessful (can load new container), now we can remove all coders from previous sessions
+ */
+ this.removeCoders();
+
+ /*
+ * add the new coders
+ */
+ this.init(decoderConfigurations, encoderConfigurations);
+
+ if (logger.isInfoEnabled())
+ logger.info("UPDATE-TO-VERSION: completed " + this);
+ }
+
+ /**
+ * initialize decoders for all the topics supported by this controller
+ * Note this is critical to be done after the Policy Container is
+ * instantiated to be able to fetch the corresponding classes.
+ *
+ * @param decoderConfiguration list of topic -> decoders -> filters mapping
+ */
+ protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
+ boolean decoder)
+ throws IllegalArgumentException {
+
+ if (logger.isInfoEnabled())
+ logger.info("INIT-CODERS: " + this);
+
+ if (coderConfigurations == null) {
+ return;
+ }
+
+
+ for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
+ String topic = coderConfig.getTopic();
+
+ CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
+ if (coderConfig.getCustomGsonCoder() != null &&
+ coderConfig.getCustomGsonCoder().getClassContainer() != null &&
+ !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
+
+ String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
+ if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
+ customGsonCoderClass)) {
+ logger.error(customGsonCoderClass + " cannot be retrieved");
+ throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
+ } else {
+ if (logger.isInfoEnabled())
+ logger.info("CLASS FETCHED " + customGsonCoderClass);
+ }
+ }
+
+ CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
+ if (coderConfig.getCustomJacksonCoder() != null &&
+ coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
+ !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
+
+ String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
+ if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
+ customJacksonCoderClass)) {
+ logger.error(customJacksonCoderClass + " cannot be retrieved");
+ throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
+ } else {
+ if (logger.isInfoEnabled())
+ logger.info("CLASS FETCHED " + customJacksonCoderClass);
+ }
+ }
+
+ List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
+ if (coderFilters == null || coderFilters.isEmpty()) {
+ continue;
+ }
+
+ for (PotentialCoderFilter coderFilter : coderFilters) {
+ String potentialCodedClass = coderFilter.getCodedClass();
+ JsonProtocolFilter protocolFilter = coderFilter.getFilter();
+
+ if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
+ potentialCodedClass)) {
+ logger.error(potentialCodedClass + " cannot be retrieved");
+ throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
+ } else {
+ if (logger.isInfoEnabled())
+ logger.info("CLASS FETCHED " + potentialCodedClass);
+ }
+
+ if (decoder)
+ EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
+ topic, potentialCodedClass, protocolFilter,
+ customGsonCoder,
+ customJacksonCoder,
+ this.policyContainer.getClassLoader().hashCode());
+ else
+ EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
+ topic, potentialCodedClass, protocolFilter,
+ customGsonCoder,
+ customJacksonCoder,
+ this.policyContainer.getClassLoader().hashCode());
+ }
+ }
+ }
+
+
+ /**
+ * remove decoders.
+ */
+ protected void removeDecoders()
+ throws IllegalArgumentException {
+ if (logger.isInfoEnabled())
+ logger.info("REMOVE-DECODERS: " + this);
+
+ if (this.decoderConfigurations == null) {
+ return;
+ }
+
+
+ for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
+ String topic = coderConfig.getTopic();
+ EventProtocolCoder.manager.removeDecoders
+ (this.getGroupId(), this.getArtifactId(), topic);
+ }
+ }
+
+ /**
+ * remove decoders.
+ */
+ protected void removeEncoders()
+ throws IllegalArgumentException {
+
+ if (logger.isInfoEnabled())
+ logger.info("REMOVE-ENCODERS: " + this);
+
+ if (this.encoderConfigurations == null)
+ return;
+
+
+ for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
+ String topic = coderConfig.getTopic();
+ EventProtocolCoder.manager.removeEncoders
+ (this.getGroupId(), this.getArtifactId(), topic);
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
+ if (!ReflectionUtil.isClass
+ (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
+ logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
+ return false;
+ }
+
+ if (modelHash == this.modelClassLoaderHash) {
+ if (logger.isInfoEnabled())
+ logger.info(coderClass.getCanonicalName() +
+ this + " class loader matches original drools controller rules classloader " +
+ coderClass.getClassLoader());
+ return true;
+ } else {
+ if (logger.isWarnEnabled())
+ logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
+ coderClass.getClassLoader() + " vs " +
+ this.policyContainer.getClassLoader());
+ return false;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean start() {
+
+ if (logger.isInfoEnabled())
+ logger.info("START: " + this);
+
+ synchronized (this) {
+ if (this.alive)
+ return true;
+
+ this.alive = true;
+ }
+
+ return this.policyContainer.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean stop() {
+
+ logger.info("STOP: " + this);
+
+ synchronized (this) {
+ if (!this.alive)
+ return true;
+
+ this.alive = false;
+ }
+
+ return this.policyContainer.stop();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() throws IllegalStateException {
+
+ if (logger.isInfoEnabled())
+ logger.info(this + "SHUTDOWN");
+
+ try {
+ this.stop();
+ this.removeCoders();
+ } catch (Exception e) {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "stop", this.toString());
+ } finally {
+ this.policyContainer.shutdown();
+ }
+
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void halt() throws IllegalStateException {
+ if (logger.isInfoEnabled())
+ logger.info(this + "SHUTDOWN");
+
+ try {
+ this.stop();
+ this.removeCoders();
+ } catch (Exception e) {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "halt", this.toString());
+ } finally {
+ this.policyContainer.destroy();
+ }
+ }
+
+ /**
+ * removes this drools controllers and encoders and decoders from operation
+ */
+ protected void removeCoders() {
+
+ if (logger.isInfoEnabled())
+ logger.info(this + "REMOVE-CODERS");
+
+ try {
+ this.removeDecoders();
+ } catch (IllegalArgumentException e) {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeDecoders", this.toString());
+ }
+
+ try {
+ this.removeEncoders();
+ } catch (IllegalArgumentException e) {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeEncoders", this.toString());
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean offer(String topic, String event) {
+
+ if (logger.isInfoEnabled())
+ logger.info("OFFER: " + topic + ":" + event + " INTO " + this);
+
+ if (this.locked)
+ return true;
+
+ if (!this.alive)
+ return true;
+
+ // 0. Check if the policy container has any sessions
+
+ if (this.policyContainer.getPolicySessions().size() <= 0) {
+ // no sessions
+ return true;
+ }
+
+ // 1. Now, check if this topic has a decoder:
+
+ if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
+ this.getArtifactId(),
+ topic)) {
+
+ logger.warn("DECODING-UNSUPPORTED: " + ":" + this.getGroupId() +
+ ":" + this.getArtifactId() + ":" + topic + " IN " + this);
+ return true;
+ }
+
+ // 2. Decode
+
+ Object anEvent;
+ try {
+ anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
+ this.getArtifactId(),
+ topic,
+ event);
+ } catch (Exception e) {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e,
+ "DECODE:"+ this.getGroupId() + ":" +
+ this.getArtifactId() + ":" + topic + ":" + event,
+ this.toString());
+ return true;
+ }
+
+ synchronized(this.recentSourceEvents) {
+ this.recentSourceEvents.add(anEvent);
+ }
+
+ // increment event count for Nagios monitoring
+ PdpJmx.getInstance().updateOccured();
+
+ // Broadcast
+
+ if (logger.isInfoEnabled())
+ logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
+
+ if (!this.policyContainer.insertAll(anEvent))
+ logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
+
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean deliver(TopicSink sink, Object event)
+ throws IllegalArgumentException,
+ IllegalStateException,
+ UnsupportedOperationException {
+
+ if (logger.isInfoEnabled())
+ logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
+
+ if (sink == null)
+ throw new IllegalArgumentException
+ (this + " invalid sink");
+
+ if (event == null)
+ throw new IllegalArgumentException
+ (this + " invalid event");
+
+ if (this.locked)
+ throw new IllegalStateException
+ (this + " is locked");
+
+ if (!this.alive)
+ throw new IllegalStateException
+ (this + " is stopped");
+
+ String json =
+ EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
+
+ synchronized(this.recentSinkEvents) {
+ this.recentSinkEvents.add(json);
+ }
+
+ return sink.send(json);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getVersion() {
+ return this.policyContainer.getVersion();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getArtifactId() {
+ return this.policyContainer.getArtifactId();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getGroupId() {
+ return this.policyContainer.getGroupId();
+ }
+
+ /**
+ * @return the modelClassLoaderHash
+ */
+ public int getModelClassLoaderHash() {
+ return modelClassLoaderHash;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized boolean lock() {
+ logger.info("LOCK: " + this);
+
+ this.locked = true;
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized boolean unlock() {
+ logger.info("UNLOCK: " + this);
+
+ this.locked = false;
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public List<String> getSessionNames() {
+ List<String> sessionNames = new ArrayList<String>();
+ try {
+ for (PolicySession session: this.policyContainer.getPolicySessions()) {
+ sessionNames.add(session.getFullName());
+ }
+ } catch (Exception e) {
+ logger.warn(MessageCodes.EXCEPTION_ERROR, e,
+ "Can't retrieve POLICY-CORE sessions: " + e.getMessage(),
+ this.toString());
+ sessionNames.add(e.getMessage());
+ }
+ return sessionNames;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Class<?> fetchModelClass(String className) throws IllegalStateException {
+ Class<?> modelClass =
+ ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
+ return modelClass;
+ }
+
+ /**
+ * @return the recentSourceEvents
+ */
+ @Override
+ public Object[] getRecentSourceEvents() {
+ synchronized(this.recentSourceEvents) {
+ Object[] events = new Object[recentSourceEvents.size()];
+ return recentSourceEvents.toArray(events);
+ }
+ }
+
+ /**
+ * @return the recentSinkEvents
+ */
+ @Override
+ public String[] getRecentSinkEvents() {
+ synchronized(this.recentSinkEvents) {
+ String[] events = new String[recentSinkEvents.size()];
+ return recentSinkEvents.toArray(events);
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isBrained() {
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("MavenDroolsController [policyContainer=")
+ .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
+ .append(", alive=")
+ .append(alive).append(", locked=").append(locked).append(", decoderConfigurations=")
+ .append(decoderConfigurations).append(", encoderConfigurations=").append(encoderConfigurations)
+ .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
+ return builder.toString();
+ }
+
+}