/*- * ============LICENSE_START======================================================= * policy-management * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.openecomp.policy.drools.controller.internal; import java.util.ArrayList; import java.util.List; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.openecomp.policy.common.logging.eelf.MessageCodes; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; import org.openecomp.policy.drools.controller.DroolsController; import org.openecomp.policy.drools.core.PolicyContainer; import org.openecomp.policy.drools.core.PolicySession; import org.openecomp.policy.drools.core.jmx.PdpJmx; import org.openecomp.policy.drools.event.comm.TopicSink; import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder; import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter; import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration; import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder; import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder; import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter; import org.openecomp.policy.drools.utils.ReflectionUtil; import com.fasterxml.jackson.annotation.JsonIgnore; /** * Maven-based Drools Controller that interacts with the * policy-core PolicyContainer and PolicySession to manage * Drools containers instantiated using Maven. */ public class MavenDroolsController implements DroolsController { /** * logger */ private static Logger logger = FlexLogger.getLogger(MavenDroolsController.class); /** * Policy Container, the access object to the policy-core layer */ @JsonIgnore protected final PolicyContainer policyContainer; /** * alive status of this drools controller, * reflects invocation of start()/stop() only */ protected volatile boolean alive = false; /** * locked status of this drools controller, * reflects if i/o drools related operations are permitted, * more specifically: offer() and deliver(). * It does not affect the ability to start and stop * underlying drools infrastructure */ protected volatile boolean locked = false; /** * list of topics, each with associated decoder classes, each * with a list of associated filters. */ protected List decoderConfigurations; /** * list of topics, each with associated encoder classes, each * with a list of associated filters. */ protected List encoderConfigurations; /** * recent source events processed */ protected final CircularFifoQueue recentSourceEvents = new CircularFifoQueue(10); /** * recent sink events processed */ protected final CircularFifoQueue recentSinkEvents = new CircularFifoQueue(10); /** * original Drools Model/Rules classloader hash */ protected int modelClassLoaderHash; /** * Expanded version of the constructor * * @param groupId maven group id * @param artifactId maven artifact id * @param version maven version * @param decoderConfiguration list of topic -> decoders -> filters mapping * @param encoderConfiguration list of topic -> encoders -> filters mapping * * @throws IllegalArgumentException invalid arguments passed in */ public MavenDroolsController(String groupId, String artifactId, String version, List decoderConfigurations, List encoderConfigurations) throws IllegalArgumentException { if (logger.isInfoEnabled()) logger.info("DROOLS CONTROLLER: instantiation " + this + " -> {" + groupId + ":" + artifactId + ":" + version + "}"); if (groupId == null || artifactId == null || version == null || groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) { throw new IllegalArgumentException("Missing maven coordinates: " + groupId + ":" + artifactId + ":" + version); } this.policyContainer= new PolicyContainer(groupId, artifactId, version); this.init(decoderConfigurations, encoderConfigurations); if (logger.isInfoEnabled()) logger.info("DROOLS CONTROLLER: instantiation completed " + this); } /** * init encoding/decoding configuration * @param decoderConfiguration list of topic -> decoders -> filters mapping * @param encoderConfiguration list of topic -> encoders -> filters mapping */ protected void init(List decoderConfigurations, List encoderConfigurations) { this.decoderConfigurations = decoderConfigurations; this.encoderConfigurations = encoderConfigurations; this.initCoders(decoderConfigurations, true); this.initCoders(encoderConfigurations, false); this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode(); } /** * {@inheritDoc} */ @Override public void updateToVersion(String newGroupId, String newArtifactId, String newVersion, List decoderConfigurations, List encoderConfigurations) throws IllegalArgumentException, LinkageError, Exception { if (logger.isInfoEnabled()) logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}"); if (newGroupId == null || newArtifactId == null || newVersion == null || newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) { throw new IllegalArgumentException("Missing maven coordinates: " + newGroupId + ":" + newArtifactId + ":" + newVersion); } if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) || newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) || newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) { throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " + newGroupId + ":" + newArtifactId + ":" + newVersion); } if (newGroupId.equalsIgnoreCase(this.getGroupId()) && newArtifactId.equalsIgnoreCase(this.getArtifactId()) && newVersion.equalsIgnoreCase(this.getVersion())) { logger.warn("Al in the right version: " + newGroupId + ":" + newArtifactId + ":" + newVersion + " vs. " + this); return; } if (!newGroupId.equalsIgnoreCase(this.getGroupId()) || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " + newGroupId + ":" + newArtifactId + ":" + newVersion + " vs. " + this); } /* upgrade */ String messages = this.policyContainer.updateToVersion(newVersion); if (logger.isWarnEnabled()) logger.warn(this + "UPGRADE results: " + messages); /* * If all sucessful (can load new container), now we can remove all coders from previous sessions */ this.removeCoders(); /* * add the new coders */ this.init(decoderConfigurations, encoderConfigurations); if (logger.isInfoEnabled()) logger.info("UPDATE-TO-VERSION: completed " + this); } /** * initialize decoders for all the topics supported by this controller * Note this is critical to be done after the Policy Container is * instantiated to be able to fetch the corresponding classes. * * @param decoderConfiguration list of topic -> decoders -> filters mapping */ protected void initCoders(List coderConfigurations, boolean decoder) throws IllegalArgumentException { if (logger.isInfoEnabled()) logger.info("INIT-CODERS: " + this); if (coderConfigurations == null) { return; } for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) { String topic = coderConfig.getTopic(); CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); if (coderConfig.getCustomGsonCoder() != null && coderConfig.getCustomGsonCoder().getClassContainer() != null && !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) { String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer(); if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), customGsonCoderClass)) { logger.error(customGsonCoderClass + " cannot be retrieved"); throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved"); } else { if (logger.isInfoEnabled()) logger.info("CLASS FETCHED " + customGsonCoderClass); } } CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder(); if (coderConfig.getCustomJacksonCoder() != null && coderConfig.getCustomJacksonCoder().getClassContainer() != null && !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) { String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer(); if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), customJacksonCoderClass)) { logger.error(customJacksonCoderClass + " cannot be retrieved"); throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved"); } else { if (logger.isInfoEnabled()) logger.info("CLASS FETCHED " + customJacksonCoderClass); } } List coderFilters = coderConfig.getCoderFilters(); if (coderFilters == null || coderFilters.isEmpty()) { continue; } for (PotentialCoderFilter coderFilter : coderFilters) { String potentialCodedClass = coderFilter.getCodedClass(); JsonProtocolFilter protocolFilter = coderFilter.getFilter(); if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), potentialCodedClass)) { logger.error(potentialCodedClass + " cannot be retrieved"); throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved"); } else { if (logger.isInfoEnabled()) logger.info("CLASS FETCHED " + potentialCodedClass); } if (decoder) EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(), topic, potentialCodedClass, protocolFilter, customGsonCoder, customJacksonCoder, this.policyContainer.getClassLoader().hashCode()); else EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(), topic, potentialCodedClass, protocolFilter, customGsonCoder, customJacksonCoder, this.policyContainer.getClassLoader().hashCode()); } } } /** * remove decoders. */ protected void removeDecoders() throws IllegalArgumentException { if (logger.isInfoEnabled()) logger.info("REMOVE-DECODERS: " + this); if (this.decoderConfigurations == null) { return; } for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) { String topic = coderConfig.getTopic(); EventProtocolCoder.manager.removeDecoders (this.getGroupId(), this.getArtifactId(), topic); } } /** * remove decoders. */ protected void removeEncoders() throws IllegalArgumentException { if (logger.isInfoEnabled()) logger.info("REMOVE-ENCODERS: " + this); if (this.encoderConfigurations == null) return; for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) { String topic = coderConfig.getTopic(); EventProtocolCoder.manager.removeEncoders (this.getGroupId(), this.getArtifactId(), topic); } } /** * {@inheritDoc} */ @Override public boolean ownsCoder(Class coderClass, int modelHash) throws IllegalStateException { if (!ReflectionUtil.isClass (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) { logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. "); return false; } if (modelHash == this.modelClassLoaderHash) { if (logger.isInfoEnabled()) logger.info(coderClass.getCanonicalName() + this + " class loader matches original drools controller rules classloader " + coderClass.getClassLoader()); return true; } else { if (logger.isWarnEnabled()) logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " + coderClass.getClassLoader() + " vs " + this.policyContainer.getClassLoader()); return false; } } /** * {@inheritDoc} */ @Override public boolean start() { if (logger.isInfoEnabled()) logger.info("START: " + this); synchronized (this) { if (this.alive) return true; this.alive = true; } return this.policyContainer.start(); } /** * {@inheritDoc} */ @Override public boolean stop() { logger.info("STOP: " + this); synchronized (this) { if (!this.alive) return true; this.alive = false; } return this.policyContainer.stop(); } /** * {@inheritDoc} */ @Override public void shutdown() throws IllegalStateException { if (logger.isInfoEnabled()) logger.info(this + "SHUTDOWN"); try { this.stop(); this.removeCoders(); } catch (Exception e) { logger.error(MessageCodes.EXCEPTION_ERROR, e, "stop", this.toString()); } finally { this.policyContainer.shutdown(); } } /** * {@inheritDoc} */ @Override public void halt() throws IllegalStateException { if (logger.isInfoEnabled()) logger.info(this + "SHUTDOWN"); try { this.stop(); this.removeCoders(); } catch (Exception e) { logger.error(MessageCodes.EXCEPTION_ERROR, e, "halt", this.toString()); } finally { this.policyContainer.destroy(); } } /** * removes this drools controllers and encoders and decoders from operation */ protected void removeCoders() { if (logger.isInfoEnabled()) logger.info(this + "REMOVE-CODERS"); try { this.removeDecoders(); } catch (IllegalArgumentException e) { logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeDecoders", this.toString()); } try { this.removeEncoders(); } catch (IllegalArgumentException e) { logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeEncoders", this.toString()); } } /** * {@inheritDoc} */ @Override public boolean isAlive() { return this.alive; } /** * {@inheritDoc} */ @Override public boolean offer(String topic, String event) { if (logger.isInfoEnabled()) logger.info("OFFER: " + topic + ":" + event + " INTO " + this); if (this.locked) return true; if (!this.alive) return true; // 0. Check if the policy container has any sessions if (this.policyContainer.getPolicySessions().size() <= 0) { // no sessions return true; } // 1. Now, check if this topic has a decoder: if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(), this.getArtifactId(), topic)) { logger.warn("DECODING-UNSUPPORTED: " + ":" + this.getGroupId() + ":" + this.getArtifactId() + ":" + topic + " IN " + this); return true; } // 2. Decode Object anEvent; try { anEvent = EventProtocolCoder.manager.decode(this.getGroupId(), this.getArtifactId(), topic, event); } catch (Exception e) { logger.error(MessageCodes.EXCEPTION_ERROR, e, "DECODE:"+ this.getGroupId() + ":" + this.getArtifactId() + ":" + topic + ":" + event, this.toString()); return true; } synchronized(this.recentSourceEvents) { this.recentSourceEvents.add(anEvent); } // increment event count for Nagios monitoring PdpJmx.getInstance().updateOccured(); // Broadcast if (logger.isInfoEnabled()) logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName()); if (!this.policyContainer.insertAll(anEvent)) logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames()); return true; } /** * {@inheritDoc} */ @Override public boolean deliver(TopicSink sink, Object event) throws IllegalArgumentException, IllegalStateException, UnsupportedOperationException { if (logger.isInfoEnabled()) logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink); if (sink == null) throw new IllegalArgumentException (this + " invalid sink"); if (event == null) throw new IllegalArgumentException (this + " invalid event"); if (this.locked) throw new IllegalStateException (this + " is locked"); if (!this.alive) throw new IllegalStateException (this + " is stopped"); String json = EventProtocolCoder.manager.encode(sink.getTopic(), event, this); synchronized(this.recentSinkEvents) { this.recentSinkEvents.add(json); } return sink.send(json); } /** * {@inheritDoc} */ @Override public String getVersion() { return this.policyContainer.getVersion(); } /** * {@inheritDoc} */ @Override public String getArtifactId() { return this.policyContainer.getArtifactId(); } /** * {@inheritDoc} */ @Override public String getGroupId() { return this.policyContainer.getGroupId(); } /** * @return the modelClassLoaderHash */ public int getModelClassLoaderHash() { return modelClassLoaderHash; } /** * {@inheritDoc} */ @Override public synchronized boolean lock() { logger.info("LOCK: " + this); this.locked = true; return true; } /** * {@inheritDoc} */ @Override public synchronized boolean unlock() { logger.info("UNLOCK: " + this); this.locked = false; return true; } /** * {@inheritDoc} */ @Override public boolean isLocked() { return this.locked; } @Override public List getSessionNames() { List sessionNames = new ArrayList(); try { for (PolicySession session: this.policyContainer.getPolicySessions()) { sessionNames.add(session.getFullName()); } } catch (Exception e) { logger.warn(MessageCodes.EXCEPTION_ERROR, e, "Can't retrieve POLICY-CORE sessions: " + e.getMessage(), this.toString()); sessionNames.add(e.getMessage()); } return sessionNames; } /** * {@inheritDoc} */ @Override public Class fetchModelClass(String className) throws IllegalStateException { Class modelClass = ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className); return modelClass; } /** * @return the recentSourceEvents */ @Override public Object[] getRecentSourceEvents() { synchronized(this.recentSourceEvents) { Object[] events = new Object[recentSourceEvents.size()]; return recentSourceEvents.toArray(events); } } /** * @return the recentSinkEvents */ @Override public String[] getRecentSinkEvents() { synchronized(this.recentSinkEvents) { String[] events = new String[recentSinkEvents.size()]; return recentSinkEvents.toArray(events); } } /** * {@inheritDoc} */ @Override public boolean isBrained() { return true; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("MavenDroolsController [policyContainer=") .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":") .append(", alive=") .append(alive).append(", locked=").append(locked).append(", decoderConfigurations=") .append(decoderConfigurations).append(", encoderConfigurations=").append(encoderConfigurations) .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]"); return builder.toString(); } }