From c733c08b7201ffdee81c7dab2ed50a1ce8fd5bbb Mon Sep 17 00:00:00 2001 From: Pamela Dragosh Date: Thu, 13 Sep 2018 19:35:18 -0400 Subject: Fix checkstyle in policy-management The submodule policy-management checkstyle fixes. There may be one or two sonar fixes in there. Issue-ID: POLICY-882 Change-Id: I9cb43c573c6811dd058943650ba1ea5f6dc880aa Signed-off-by: Pamela Dragosh --- .../controller/internal/MavenDroolsController.java | 1653 ++++++++++---------- 1 file changed, 852 insertions(+), 801 deletions(-) (limited to 'policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java') diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java index 7b44817a..707a8e77 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java @@ -35,11 +35,11 @@ import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.FactHandle; import org.kie.api.runtime.rule.QueryResults; import org.kie.api.runtime.rule.QueryResultsRow; +import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.core.PolicyContainer; import org.onap.policy.drools.core.PolicySession; import org.onap.policy.drools.core.jmx.PdpJmx; -import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.drools.features.DroolsControllerFeatureAPI; import org.onap.policy.drools.protocol.coders.EventProtocolCoder; import org.onap.policy.drools.protocol.coders.JsonProtocolFilter; @@ -57,806 +57,857 @@ import org.slf4j.LoggerFactory; * Drools containers instantiated using Maven. */ public class MavenDroolsController implements DroolsController { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class); - - /** - * Policy Container, the access object to the policy-core layer - */ - @JsonIgnore - protected final PolicyContainer policyContainer; - - /** - * alive status of this drools controller, - * reflects invocation of start()/stop() only - */ - protected volatile boolean alive = false; - - /** - * locked status of this drools controller, - * reflects if i/o drools related operations are permitted, - * more specifically: offer() and deliver(). - * It does not affect the ability to start and stop - * underlying drools infrastructure - */ - protected volatile boolean locked = false; - - /** - * list of topics, each with associated decoder classes, each - * with a list of associated filters. - */ - protected List decoderConfigurations; - - /** - * list of topics, each with associated encoder classes, each - * with a list of associated filters. - */ - protected List encoderConfigurations; - - /** - * recent source events processed - */ - protected final CircularFifoQueue recentSourceEvents = new CircularFifoQueue<>(10); - - /** - * recent sink events processed - */ - protected final CircularFifoQueue recentSinkEvents = new CircularFifoQueue<>(10); - - /** - * original Drools Model/Rules classloader hash - */ - protected int modelClassLoaderHash; - - /** - * Expanded version of the constructor - * - * @param groupId maven group id - * @param artifactId maven artifact id - * @param version maven version - * @param decoderConfigurations list of topic -> decoders -> filters mapping - * @param encoderConfigurations list of topic -> encoders -> filters mapping - * - * @throws IllegalArgumentException invalid arguments passed in - */ - public MavenDroolsController(String groupId, - String artifactId, - String version, - List decoderConfigurations, - List encoderConfigurations) { - - logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version); - - if (groupId == null || groupId.isEmpty()) - throw new IllegalArgumentException("Missing maven group-id coordinate"); - - if (artifactId == null || artifactId.isEmpty()) - throw new IllegalArgumentException("Missing maven artifact-id coordinate"); - - if (version == null || version.isEmpty()) - throw new IllegalArgumentException("Missing maven version coordinate"); - - this.policyContainer= new PolicyContainer(groupId, artifactId, version); - this.init(decoderConfigurations, encoderConfigurations); - - logger.debug("{}: instantiation completed ", this); - } - - /** - * init encoding/decoding configuration - * @param decoderConfigurations list of topic -> decoders -> filters mapping - * @param encoderConfigurations list of topic -> encoders -> filters mapping - */ - protected void init(List decoderConfigurations, - List encoderConfigurations) { - - this.decoderConfigurations = decoderConfigurations; - this.encoderConfigurations = encoderConfigurations; - - this.initCoders(decoderConfigurations, true); - this.initCoders(encoderConfigurations, false); - - this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode(); - } - - @Override - public void updateToVersion(String newGroupId, String newArtifactId, String newVersion, - List decoderConfigurations, - List encoderConfigurations) - throws LinkageError { - - logger.info("{}: updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion); - - if (newGroupId == null || newGroupId.isEmpty()) - throw new IllegalArgumentException("Missing maven group-id coordinate"); - - if (newArtifactId == null || newArtifactId.isEmpty()) - throw new IllegalArgumentException("Missing maven artifact-id coordinate"); - - if (newVersion == null || newVersion.isEmpty()) - throw new IllegalArgumentException("Missing maven version coordinate"); - - if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) || - newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) || - newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) { - throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " + - newGroupId + ":" + newArtifactId + ":" + - newVersion); - } - - if (newGroupId.equalsIgnoreCase(this.getGroupId()) && - newArtifactId.equalsIgnoreCase(this.getArtifactId()) && - newVersion.equalsIgnoreCase(this.getVersion())) { - logger.warn("Al in the right version: " + newGroupId + ":" + - newArtifactId + ":" + newVersion + " vs. " + this); - return; - } - - if (!newGroupId.equalsIgnoreCase(this.getGroupId()) || - !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { - throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " + - newGroupId + ":" + newArtifactId + ":" + - newVersion + " vs. " + this); - } - - /* upgrade */ - String messages = this.policyContainer.updateToVersion(newVersion); - if (logger.isWarnEnabled()) - logger.warn(this + "UPGRADE results: " + messages); - - /* - * If all sucessful (can load new container), now we can remove all coders from previous sessions - */ - this.removeCoders(); - - /* - * add the new coders - */ - this.init(decoderConfigurations, encoderConfigurations); - - if (logger.isInfoEnabled()) - logger.info("UPDATE-TO-VERSION: completed " + this); - } - - /** - * initialize decoders for all the topics supported by this controller - * Note this is critical to be done after the Policy Container is - * instantiated to be able to fetch the corresponding classes. - * - * @param coderConfigurations list of topic -> decoders -> filters mapping - */ - protected void initCoders(List coderConfigurations, - boolean decoder) { - - if (logger.isInfoEnabled()) - logger.info("INIT-CODERS: " + this); - - if (coderConfigurations == null) { - return; - } - - - for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) { - String topic = coderConfig.getTopic(); - - CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); - if (coderConfig.getCustomGsonCoder() != null && - coderConfig.getCustomGsonCoder().getClassContainer() != null && - !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) { - - String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer(); - if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), - customGsonCoderClass)) { - throw makeRetrieveEx(customGsonCoderClass); - } else { - if (logger.isInfoEnabled()) - logClassFetched(customGsonCoderClass); - } - } - - CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder(); - if (coderConfig.getCustomJacksonCoder() != null && - coderConfig.getCustomJacksonCoder().getClassContainer() != null && - !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) { - - String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer(); - if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), - customJacksonCoderClass)) { - throw makeRetrieveEx(customJacksonCoderClass); - } else { - if (logger.isInfoEnabled()) - logClassFetched(customJacksonCoderClass); - } - } - - List coderFilters = coderConfig.getCoderFilters(); - if (coderFilters == null || coderFilters.isEmpty()) { - continue; - } - - for (PotentialCoderFilter coderFilter : coderFilters) { - String potentialCodedClass = coderFilter.getCodedClass(); - JsonProtocolFilter protocolFilter = coderFilter.getFilter(); - - if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), - potentialCodedClass)) { - throw makeRetrieveEx(potentialCodedClass); - } else { - if (logger.isInfoEnabled()) - logClassFetched(potentialCodedClass); - } - - if (decoder) - EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(), - topic, potentialCodedClass, protocolFilter, - customGsonCoder, - customJacksonCoder, - this.policyContainer.getClassLoader().hashCode()); - else - EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(), - topic, potentialCodedClass, protocolFilter, - customGsonCoder, - customJacksonCoder, - this.policyContainer.getClassLoader().hashCode()); - } - } - } - - /** - * Logs an error and makes an exception for an item that cannot be retrieved. - * @param itemName - * @return a new exception - */ - private IllegalArgumentException makeRetrieveEx(String itemName) { - logger.error(itemName + " cannot be retrieved"); - return new IllegalArgumentException(itemName + " cannot be retrieved"); - } - - /** - * Logs the name of the class that was fetched. - * @param className - */ - private void logClassFetched(String className) { - logger.info("CLASS FETCHED " + className); - } - - - /** - * remove decoders. - */ - protected void removeDecoders(){ - if (logger.isInfoEnabled()) - logger.info("REMOVE-DECODERS: " + this); - - if (this.decoderConfigurations == null) { - return; - } - - - for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) { - String topic = coderConfig.getTopic(); - EventProtocolCoder.manager.removeDecoders - (this.getGroupId(), this.getArtifactId(), topic); - } - } - - /** - * remove decoders. - */ - protected void removeEncoders() { - - if (logger.isInfoEnabled()) - logger.info("REMOVE-ENCODERS: " + this); - - if (this.encoderConfigurations == null) - return; - - - for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) { - String topic = coderConfig.getTopic(); - EventProtocolCoder.manager.removeEncoders - (this.getGroupId(), this.getArtifactId(), topic); - } - } - - - @Override - public boolean ownsCoder(Class coderClass, int modelHash) { - if (!ReflectionUtil.isClass - (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) { - logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. "); - return false; - } - - if (modelHash == this.modelClassLoaderHash) { - if (logger.isInfoEnabled()) - logger.info(coderClass.getCanonicalName() + - this + " class loader matches original drools controller rules classloader " + - coderClass.getClassLoader()); - return true; - } else { - if (logger.isWarnEnabled()) - logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " + - coderClass.getClassLoader() + " vs " + - this.policyContainer.getClassLoader()); - return false; - } - } - - @Override - public boolean start() { - - if (logger.isInfoEnabled()) - logger.info("START: " + this); - - synchronized (this) { - if (this.alive) - return true; - - this.alive = true; - } - - return this.policyContainer.start(); - } - - @Override - public boolean stop() { - - logger.info("STOP: " + this); - - synchronized (this) { - if (!this.alive) - return true; - - this.alive = false; - } - - return this.policyContainer.stop(); - } - - @Override - public void shutdown() { - logger.info("{}: SHUTDOWN", this); - - try { - this.stop(); - this.removeCoders(); - } catch (Exception e) { - logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e); - } finally { - this.policyContainer.shutdown(); - } - - } - - @Override - public void halt() { - logger.info("{}: HALT", this); - - try { - this.stop(); - this.removeCoders(); - } catch (Exception e) { - logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e); - } finally { - this.policyContainer.destroy(); - } - } - - /** - * removes this drools controllers and encoders and decoders from operation - */ - protected void removeCoders() { - logger.info("{}: REMOVE-CODERS", this); - - try { - this.removeDecoders(); - } catch (IllegalArgumentException e) { - logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e); - } - - try { - this.removeEncoders(); - } catch (IllegalArgumentException e) { - logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e); - } - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean offer(String topic, String event) { - logger.debug("{}: OFFER: {} <- {}", this, topic, event); - - if (this.locked) - return true; - - if (!this.alive) - return true; - - // 0. Check if the policy container has any sessions - - if (this.policyContainer.getPolicySessions().isEmpty()) { - // no sessions - return true; - } - - // 1. Now, check if this topic has a decoder: - - if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(), - this.getArtifactId(), - topic)) { - - logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this, - topic, this.getGroupId(), this.getArtifactId()); - return true; - } - - // 2. Decode - - Object anEvent; - try { - anEvent = EventProtocolCoder.manager.decode(this.getGroupId(), - this.getArtifactId(), - topic, - event); - } catch (UnsupportedOperationException uoe) { - logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic, - event, uoe.getMessage(), uoe); - return true; - } catch (Exception e) { - logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic, - event, e.getMessage(), e); - return true; - } - - synchronized(this.recentSourceEvents) { - this.recentSourceEvents.add(anEvent); - } - - // increment event count for Nagios monitoring - PdpJmx.getInstance().updateOccured(); - - // Broadcast - - if (logger.isInfoEnabled()) - logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName()); - - for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) { - try { - if (feature.beforeInsert(this, anEvent)) - return true; - } catch (Exception e) { - logger.error("{}: feature {} before-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } - } - - boolean successInject = this.policyContainer.insertAll(anEvent); - if (!successInject) - logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames()); - - for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) { - try { - if (feature.afterInsert(this, anEvent, successInject)) - return true; - } catch (Exception e) { - logger.error("{}: feature {} after-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } - } - - return true; - } - - @Override - public boolean deliver(TopicSink sink, Object event) { - - if (logger.isInfoEnabled()) - logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink); - - if (sink == null) - throw new IllegalArgumentException - (this + " invalid sink"); - - if (event == null) - throw new IllegalArgumentException - (this + " invalid event"); - - if (this.locked) - throw new IllegalStateException - (this + " is locked"); - - if (!this.alive) - throw new IllegalStateException - (this + " is stopped"); - - String json = - EventProtocolCoder.manager.encode(sink.getTopic(), event, this); - - synchronized(this.recentSinkEvents) { - this.recentSinkEvents.add(json); - } - - return sink.send(json); - - } - - @Override - public String getVersion() { - return this.policyContainer.getVersion(); - } - - @Override - public String getArtifactId() { - return this.policyContainer.getArtifactId(); - } - - @Override - public String getGroupId() { - return this.policyContainer.getGroupId(); - } - - /** - * @return the modelClassLoaderHash - */ - public int getModelClassLoaderHash() { - return modelClassLoaderHash; - } - - @Override - public synchronized boolean lock() { - logger.info("LOCK: " + this); - - this.locked = true; - return true; - } - - @Override - public synchronized boolean unlock() { - logger.info("UNLOCK: " + this); - - this.locked = false; - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @JsonIgnore - @Override - public PolicyContainer getContainer() { - return this.policyContainer; - } - - @JsonProperty("sessions") - @Override - public List getSessionNames() { - return getSessionNames(true); - } - - @JsonProperty("sessionCoordinates") - @Override - public List getCanonicalSessionNames() { - return getSessionNames(false); - } - - /** - * get session names - * @param abbreviated true for the short form, otherwise the long form - * @return session names - */ - protected List getSessionNames(boolean abbreviated) { - List sessionNames = new ArrayList<>(); - try { - for (PolicySession session: this.policyContainer.getPolicySessions()) { - if (abbreviated) - sessionNames.add(session.getName()); - else - sessionNames.add(session.getFullName()); - } - } catch (Exception e) { - logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e); - sessionNames.add(e.getMessage()); - } - return sessionNames; - } - - /** - * provides the underlying core layer container sessions - * - * @return the attached Policy Container - */ - protected List getSessions() { - List sessions = new ArrayList<>(); - sessions.addAll(this.policyContainer.getPolicySessions()); - return sessions; - } - - /** - * provides the underlying core layer container session with name sessionName - * - * @param sessionName session name - * @return the attached Policy Container - * @throws IllegalArgumentException when an invalid session name is provided - * @throws IllegalStateException when the drools controller is in an invalid state - */ - protected PolicySession getSession(String sessionName) { - if (sessionName == null || sessionName.isEmpty()) - throw new IllegalArgumentException("A Session Name must be provided"); - - List sessions = this.getSessions(); - for (PolicySession session : sessions) { - if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName())) - return session; - } - - throw invalidSessNameEx(sessionName); - } - - private IllegalArgumentException invalidSessNameEx(String sessionName) { - return new IllegalArgumentException("Invalid Session Name: " + sessionName); - } - - @Override - public Map factClassNames(String sessionName) { - if (sessionName == null || sessionName.isEmpty()) - throw invalidSessNameEx(sessionName); - - Map classNames = new HashMap<>(); - - PolicySession session = getSession(sessionName); - KieSession kieSession = session.getKieSession(); - - Collection facts = session.getKieSession().getFactHandles(); - for (FactHandle fact : facts) { - try { - String className = kieSession.getObject(fact).getClass().getName(); - if (classNames.containsKey(className)) - classNames.put(className, classNames.get(className) + 1); - else - classNames.put(className, 1); - } catch (Exception e) { - logger.warn("Object cannot be retrieved from fact {}", fact, e); - } - } - - return classNames; - } - - @Override - public long factCount(String sessionName) { - if (sessionName == null || sessionName.isEmpty()) - throw invalidSessNameEx(sessionName); - - PolicySession session = getSession(sessionName); - return session.getKieSession().getFactCount(); - } - - @Override - public List facts(String sessionName, String className, boolean delete) { - if (sessionName == null || sessionName.isEmpty()) - throw invalidSessNameEx(sessionName); - - if (className == null || className.isEmpty()) - throw new IllegalArgumentException("Invalid Class Name: " + className); - - Class factClass = - ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className); - if (factClass == null) - throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className); - - PolicySession session = getSession(sessionName); - KieSession kieSession = session.getKieSession(); - - List factObjects = new ArrayList<>(); - - Collection factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass)); - for (FactHandle factHandle : factHandles) { - try { - factObjects.add(kieSession.getObject(factHandle)); - if (delete) - kieSession.delete(factHandle); - } catch (Exception e) { - logger.warn("Object cannot be retrieved from fact {}", factHandle, e); - } - } - - return factObjects; - } - - @Override - public List factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) { - if (sessionName == null || sessionName.isEmpty()) - throw invalidSessNameEx(sessionName); - - if (queryName == null || queryName.isEmpty()) - throw new IllegalArgumentException("Invalid Query Name: " + queryName); - - if (queriedEntity == null || queriedEntity.isEmpty()) - throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity); - - PolicySession session = getSession(sessionName); - KieSession kieSession = session.getKieSession(); - - boolean found = false; - for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { - for (Query q : kiePackage.getQueries()) { - if (q.getName() != null && q.getName().equals(queryName)) { - found = true; - break; - } - } - } - if (!found) - throw new IllegalArgumentException("Invalid Query Name: " + queryName); + + /** + * logger. + */ + private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class); + + /** + * Policy Container, the access object to the policy-core layer. + */ + @JsonIgnore + protected final PolicyContainer policyContainer; + + /** + * alive status of this drools controller, + * reflects invocation of start()/stop() only. + */ + protected volatile boolean alive = false; + + /** + * locked status of this drools controller, + * reflects if i/o drools related operations are permitted, + * more specifically: offer() and deliver(). + * It does not affect the ability to start and stop + * underlying drools infrastructure + */ + protected volatile boolean locked = false; + + /** + * list of topics, each with associated decoder classes, each + * with a list of associated filters. + */ + protected List decoderConfigurations; + + /** + * list of topics, each with associated encoder classes, each + * with a list of associated filters. + */ + protected List encoderConfigurations; + + /** + * recent source events processed. + */ + protected final CircularFifoQueue recentSourceEvents = new CircularFifoQueue<>(10); + + /** + * recent sink events processed. + */ + protected final CircularFifoQueue recentSinkEvents = new CircularFifoQueue<>(10); + + /** + * original Drools Model/Rules classloader hash. + */ + protected int modelClassLoaderHash; + + /** + * Expanded version of the constructor. + * + * @param groupId maven group id + * @param artifactId maven artifact id + * @param version maven version + * @param decoderConfigurations list of topic -> decoders -> filters mapping + * @param encoderConfigurations list of topic -> encoders -> filters mapping + * + * @throws IllegalArgumentException invalid arguments passed in + */ + public MavenDroolsController(String groupId, + String artifactId, + String version, + List decoderConfigurations, + List encoderConfigurations) { + + logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version); + + if (groupId == null || groupId.isEmpty()) { + throw new IllegalArgumentException("Missing maven group-id coordinate"); + } + + if (artifactId == null || artifactId.isEmpty()) { + throw new IllegalArgumentException("Missing maven artifact-id coordinate"); + } + + if (version == null || version.isEmpty()) { + throw new IllegalArgumentException("Missing maven version coordinate"); + } + + this.policyContainer = new PolicyContainer(groupId, artifactId, version); + this.init(decoderConfigurations, encoderConfigurations); + + logger.debug("{}: instantiation completed ", this); + } + + /** + * init encoding/decoding configuration. + * + * @param decoderConfigurations list of topic -> decoders -> filters mapping + * @param encoderConfigurations list of topic -> encoders -> filters mapping + */ + protected void init(List decoderConfigurations, + List encoderConfigurations) { + + this.decoderConfigurations = decoderConfigurations; + this.encoderConfigurations = encoderConfigurations; + + this.initCoders(decoderConfigurations, true); + this.initCoders(encoderConfigurations, false); + + this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode(); + } + + @Override + public void updateToVersion(String newGroupId, String newArtifactId, String newVersion, + List decoderConfigurations, + List encoderConfigurations) + throws LinkageError { + + logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion); + + if (newGroupId == null || newGroupId.isEmpty()) { + throw new IllegalArgumentException("Missing maven group-id coordinate"); + } + + if (newArtifactId == null || newArtifactId.isEmpty()) { + throw new IllegalArgumentException("Missing maven artifact-id coordinate"); + } + + if (newVersion == null || newVersion.isEmpty()) { + throw new IllegalArgumentException("Missing maven version coordinate"); + } + + if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) + || newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) + || newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) { + throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion); + } + + if (newGroupId.equalsIgnoreCase(this.getGroupId()) + && newArtifactId.equalsIgnoreCase(this.getArtifactId()) + && newVersion.equalsIgnoreCase(this.getVersion())) { + logger.warn("Al in the right version: " + newGroupId + ":" + + newArtifactId + ":" + newVersion + " vs. " + this); + return; + } + + if (!newGroupId.equalsIgnoreCase(this.getGroupId()) + || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { + throw new IllegalArgumentException( + "Group ID and Artifact ID maven coordinates must be identical for the upgrade: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion + " vs. " + this); + } + + /* upgrade */ + String messages = this.policyContainer.updateToVersion(newVersion); + if (logger.isWarnEnabled()) { + logger.warn("{} UPGRADE results: {}", this, messages); + } + + /* + * If all sucessful (can load new container), now we can remove all coders from previous sessions + */ + this.removeCoders(); + + /* + * add the new coders + */ + this.init(decoderConfigurations, encoderConfigurations); + + if (logger.isInfoEnabled()) { + logger.info("UPDATE-TO-VERSION: completed " + this); + } + } + + /** + * initialize decoders for all the topics supported by this controller + * Note this is critical to be done after the Policy Container is + * instantiated to be able to fetch the corresponding classes. + * + * @param coderConfigurations list of topic -> decoders -> filters mapping + */ + protected void initCoders(List coderConfigurations, + boolean decoder) { + + if (logger.isInfoEnabled()) { + logger.info("INIT-CODERS: " + this); + } + + if (coderConfigurations == null) { + return; + } + + + for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) { + String topic = coderConfig.getTopic(); + + CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); + if (coderConfig.getCustomGsonCoder() != null + && coderConfig.getCustomGsonCoder().getClassContainer() != null + && !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) { + + String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer(); + if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), + customGsonCoderClass)) { + throw makeRetrieveEx(customGsonCoderClass); + } else { + if (logger.isInfoEnabled()) { + logClassFetched(customGsonCoderClass); + } + } + } + + CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder(); + if (coderConfig.getCustomJacksonCoder() != null + && coderConfig.getCustomJacksonCoder().getClassContainer() != null + && !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) { + + String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer(); + if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), + customJacksonCoderClass)) { + throw makeRetrieveEx(customJacksonCoderClass); + } else { + if (logger.isInfoEnabled()) { + logClassFetched(customJacksonCoderClass); + } + } + } + + List coderFilters = coderConfig.getCoderFilters(); + if (coderFilters == null || coderFilters.isEmpty()) { + continue; + } + + for (PotentialCoderFilter coderFilter : coderFilters) { + String potentialCodedClass = coderFilter.getCodedClass(); + JsonProtocolFilter protocolFilter = coderFilter.getFilter(); + + if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), + potentialCodedClass)) { + throw makeRetrieveEx(potentialCodedClass); + } else { + if (logger.isInfoEnabled()) { + logClassFetched(potentialCodedClass); + } + } + + if (decoder) { + EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(), + topic, potentialCodedClass, protocolFilter, + customGsonCoder, + customJacksonCoder, + this.policyContainer.getClassLoader().hashCode()); + } else { + EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(), + topic, potentialCodedClass, protocolFilter, + customGsonCoder, + customJacksonCoder, + this.policyContainer.getClassLoader().hashCode()); + } + } + } + } + + /** + * Logs an error and makes an exception for an item that cannot be retrieved. + * @param itemName the item to retrieve + * @return a new exception + */ + private IllegalArgumentException makeRetrieveEx(String itemName) { + logger.error("{} cannot be retrieved", itemName); + return new IllegalArgumentException(itemName + " cannot be retrieved"); + } + + /** + * Logs the name of the class that was fetched. + * @param className class name fetched + */ + private void logClassFetched(String className) { + logger.info("CLASS FETCHED {}", className); + } + + + /** + * remove decoders. + */ + protected void removeDecoders() { + if (logger.isInfoEnabled()) { + logger.info("REMOVE-DECODERS: {}", this); + } + + if (this.decoderConfigurations == null) { + return; + } + + + for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) { + String topic = coderConfig.getTopic(); + EventProtocolCoder.manager.removeDecoders(this.getGroupId(), this.getArtifactId(), topic); + } + } + + /** + * remove decoders. + */ + protected void removeEncoders() { + + if (logger.isInfoEnabled()) { + logger.info("REMOVE-ENCODERS: {}", this); + } + + if (this.encoderConfigurations == null) { + return; + } + + for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) { + String topic = coderConfig.getTopic(); + EventProtocolCoder.manager.removeEncoders(this.getGroupId(), this.getArtifactId(), topic); + } + } + + + @Override + public boolean ownsCoder(Class coderClass, int modelHash) { + if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) { + logger.error("{}{} cannot be retrieved. ", this, coderClass.getCanonicalName()); + return false; + } + + if (modelHash == this.modelClassLoaderHash) { + if (logger.isInfoEnabled()) { + logger.info(coderClass.getCanonicalName() + + this + " class loader matches original drools controller rules classloader " + + coderClass.getClassLoader()); + } + return true; + } else { + if (logger.isWarnEnabled()) { + logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " + + coderClass.getClassLoader() + " vs " + + this.policyContainer.getClassLoader()); + } + return false; + } + } + + @Override + public boolean start() { + + if (logger.isInfoEnabled()) { + logger.info("START: {}", this); + } + + synchronized (this) { + if (this.alive) { + return true; + } + this.alive = true; + } + + return this.policyContainer.start(); + } + + @Override + public boolean stop() { + + logger.info("STOP: {}", this); + + synchronized (this) { + if (!this.alive) { + return true; + } + this.alive = false; + } + + return this.policyContainer.stop(); + } + + @Override + public void shutdown() { + logger.info("{}: SHUTDOWN", this); + + try { + this.stop(); + this.removeCoders(); + } catch (Exception e) { + logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e); + } finally { + this.policyContainer.shutdown(); + } + + } + + @Override + public void halt() { + logger.info("{}: HALT", this); + + try { + this.stop(); + this.removeCoders(); + } catch (Exception e) { + logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e); + } finally { + this.policyContainer.destroy(); + } + } + + /** + * removes this drools controllers and encoders and decoders from operation. + */ + protected void removeCoders() { + logger.info("{}: REMOVE-CODERS", this); + + try { + this.removeDecoders(); + } catch (IllegalArgumentException e) { + logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e); + } + + try { + this.removeEncoders(); + } catch (IllegalArgumentException e) { + logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e); + } + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean offer(String topic, String event) { + logger.debug("{}: OFFER: {} <- {}", this, topic, event); + + if (this.locked) { + return true; + } + if (!this.alive) { + return true; + } + + // 0. Check if the policy container has any sessions + + if (this.policyContainer.getPolicySessions().isEmpty()) { + // no sessions + return true; + } + + // 1. Now, check if this topic has a decoder: + + if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(), + this.getArtifactId(), + topic)) { + + logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this, + topic, this.getGroupId(), this.getArtifactId()); + return true; + } + + // 2. Decode + + Object anEvent; + try { + anEvent = EventProtocolCoder.manager.decode(this.getGroupId(), + this.getArtifactId(), + topic, + event); + } catch (UnsupportedOperationException uoe) { + logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic, + event, uoe.getMessage(), uoe); + return true; + } catch (Exception e) { + logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic, + event, e.getMessage(), e); + return true; + } + + synchronized (this.recentSourceEvents) { + this.recentSourceEvents.add(anEvent); + } + + // increment event count for Nagios monitoring + PdpJmx.getInstance().updateOccured(); + + // Broadcast + + if (logger.isInfoEnabled()) { + logger.info("{} BROADCAST-INJECT of {} FROM {} INTO {}", + this, event, topic, this.policyContainer.getName()); + } + + for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) { + try { + if (feature.beforeInsert(this, anEvent)) { + return true; + } + } catch (Exception e) { + logger.error("{}: feature {} before-insert failure because of {}", + this, feature.getClass().getName(), e.getMessage(), e); + } + } + + boolean successInject = this.policyContainer.insertAll(anEvent); + if (!successInject) { + logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames()); + } + + for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) { + try { + if (feature.afterInsert(this, anEvent, successInject)) { + return true; + } + } catch (Exception e) { + logger.error("{}: feature {} after-insert failure because of {}", + this, feature.getClass().getName(), e.getMessage(), e); + } + } + + return true; + } + + @Override + public boolean deliver(TopicSink sink, Object event) { + + if (logger.isInfoEnabled()) { + logger.info("{}DELIVER: {} FROM {} TO {}", this, event, this, sink); + } + + if (sink == null) { + throw new IllegalArgumentException(this + " invalid sink"); + } + + if (event == null) { + throw new IllegalArgumentException(this + " invalid event"); + } - List factObjects = new ArrayList<>(); - - QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams); - for (QueryResultsRow row : queryResults) { - try { - factObjects.add(row.get(queriedEntity)); - if (delete) - kieSession.delete(row.getFactHandle(queriedEntity)); - } catch (Exception e) { - logger.warn("Object cannot be retrieved from row: {}", row, e); - } - } - - return factObjects; - } - - @Override - public Class fetchModelClass(String className) { - return ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className); - } - - /** - * @return the recentSourceEvents - */ - @Override - public Object[] getRecentSourceEvents() { - synchronized(this.recentSourceEvents) { - Object[] events = new Object[recentSourceEvents.size()]; - return recentSourceEvents.toArray(events); - } - } - - /** - * @return the recentSinkEvents - */ - @Override - public String[] getRecentSinkEvents() { - synchronized(this.recentSinkEvents) { - String[] events = new String[recentSinkEvents.size()]; - return recentSinkEvents.toArray(events); - } - } - - @Override - public boolean isBrained() { - return true; - } - - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("MavenDroolsController [policyContainer=") - .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":") - .append(", alive=") - .append(alive).append(", locked=") - .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]"); - return builder.toString(); - } + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (!this.alive) { + throw new IllegalStateException(this + " is stopped"); + } + + String json = + EventProtocolCoder.manager.encode(sink.getTopic(), event, this); + + synchronized (this.recentSinkEvents) { + this.recentSinkEvents.add(json); + } + + return sink.send(json); + + } + + @Override + public String getVersion() { + return this.policyContainer.getVersion(); + } + + @Override + public String getArtifactId() { + return this.policyContainer.getArtifactId(); + } + + @Override + public String getGroupId() { + return this.policyContainer.getGroupId(); + } + + /** + * Get model class loader hash. + * + * @return the modelClassLoaderHash + */ + public int getModelClassLoaderHash() { + return modelClassLoaderHash; + } + + @Override + public synchronized boolean lock() { + logger.info("LOCK: {}", this); + + this.locked = true; + return true; + } + + @Override + public synchronized boolean unlock() { + logger.info("UNLOCK: {}", this); + + this.locked = false; + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @JsonIgnore + @Override + public PolicyContainer getContainer() { + return this.policyContainer; + } + + @JsonProperty("sessions") + @Override + public List getSessionNames() { + return getSessionNames(true); + } + + /** + * get session names. + * + * @param abbreviated true for the short form, otherwise the long form + * @return session names + */ + protected List getSessionNames(boolean abbreviated) { + List sessionNames = new ArrayList<>(); + try { + for (PolicySession session: this.policyContainer.getPolicySessions()) { + if (abbreviated) { + sessionNames.add(session.getName()); + } else { + sessionNames.add(session.getFullName()); + } + } + } catch (Exception e) { + logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e); + sessionNames.add(e.getMessage()); + } + return sessionNames; + } + + @JsonProperty("sessionCoordinates") + @Override + public List getCanonicalSessionNames() { + return getSessionNames(false); + } + + /** + * provides the underlying core layer container sessions. + * + * @return the attached Policy Container + */ + protected List getSessions() { + List sessions = new ArrayList<>(); + sessions.addAll(this.policyContainer.getPolicySessions()); + return sessions; + } + + /** + * provides the underlying core layer container session with name sessionName. + * + * @param sessionName session name + * @return the attached Policy Container + * @throws IllegalArgumentException when an invalid session name is provided + * @throws IllegalStateException when the drools controller is in an invalid state + */ + protected PolicySession getSession(String sessionName) { + if (sessionName == null || sessionName.isEmpty()) { + throw new IllegalArgumentException("A Session Name must be provided"); + } + + List sessions = this.getSessions(); + for (PolicySession session : sessions) { + if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName())) { + return session; + } + } + + throw invalidSessNameEx(sessionName); + } + + private IllegalArgumentException invalidSessNameEx(String sessionName) { + return new IllegalArgumentException("Invalid Session Name: " + sessionName); + } + + @Override + public Map factClassNames(String sessionName) { + if (sessionName == null || sessionName.isEmpty()) { + throw invalidSessNameEx(sessionName); + } + + Map classNames = new HashMap<>(); + + PolicySession session = getSession(sessionName); + KieSession kieSession = session.getKieSession(); + + Collection facts = session.getKieSession().getFactHandles(); + for (FactHandle fact : facts) { + try { + String className = kieSession.getObject(fact).getClass().getName(); + if (classNames.containsKey(className)) { + classNames.put(className, classNames.get(className) + 1); + } else { + classNames.put(className, 1); + } + } catch (Exception e) { + logger.warn("Object cannot be retrieved from fact {}", fact, e); + } + } + + return classNames; + } + + @Override + public long factCount(String sessionName) { + if (sessionName == null || sessionName.isEmpty()) { + throw invalidSessNameEx(sessionName); + } + + PolicySession session = getSession(sessionName); + return session.getKieSession().getFactCount(); + } + + @Override + public List facts(String sessionName, String className, boolean delete) { + if (sessionName == null || sessionName.isEmpty()) { + throw invalidSessNameEx(sessionName); + } + + if (className == null || className.isEmpty()) { + throw new IllegalArgumentException("Invalid Class Name: " + className); + } + + Class factClass = + ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className); + if (factClass == null) { + throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className); + } + + PolicySession session = getSession(sessionName); + KieSession kieSession = session.getKieSession(); + + List factObjects = new ArrayList<>(); + + Collection factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass)); + for (FactHandle factHandle : factHandles) { + try { + factObjects.add(kieSession.getObject(factHandle)); + if (delete) { + kieSession.delete(factHandle); + } + } catch (Exception e) { + logger.warn("Object cannot be retrieved from fact {}", factHandle, e); + } + } + + return factObjects; + } + + @Override + public List factQuery(String sessionName, String queryName, String queriedEntity, + boolean delete, Object... queryParams) { + if (sessionName == null || sessionName.isEmpty()) { + throw invalidSessNameEx(sessionName); + } + + if (queryName == null || queryName.isEmpty()) { + throw new IllegalArgumentException("Invalid Query Name: " + queryName); + } + + if (queriedEntity == null || queriedEntity.isEmpty()) { + throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity); + } + + PolicySession session = getSession(sessionName); + KieSession kieSession = session.getKieSession(); + + boolean found = false; + for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { + for (Query q : kiePackage.getQueries()) { + if (q.getName() != null && q.getName().equals(queryName)) { + found = true; + break; + } + } + } + if (!found) { + throw new IllegalArgumentException("Invalid Query Name: " + queryName); + } + + List factObjects = new ArrayList<>(); + + QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams); + for (QueryResultsRow row : queryResults) { + try { + factObjects.add(row.get(queriedEntity)); + if (delete) { + kieSession.delete(row.getFactHandle(queriedEntity)); + } + } catch (Exception e) { + logger.warn("Object cannot be retrieved from row: {}", row, e); + } + } + + return factObjects; + } + + @Override + public Class fetchModelClass(String className) { + return ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className); + } + + /** + * Get recent source events. + * + * @return the recentSourceEvents + */ + @Override + public Object[] getRecentSourceEvents() { + synchronized (this.recentSourceEvents) { + Object[] events = new Object[recentSourceEvents.size()]; + return recentSourceEvents.toArray(events); + } + } + + /** + * Get recent sink events. + * + * @return the recentSinkEvents + */ + @Override + public String[] getRecentSinkEvents() { + synchronized (this.recentSinkEvents) { + String[] events = new String[recentSinkEvents.size()]; + return recentSinkEvents.toArray(events); + } + } + + @Override + public boolean isBrained() { + return true; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder + .append("MavenDroolsController [policyContainer=") + .append((policyContainer != null) ? policyContainer.getName() : "NULL") + .append(":") + .append(", alive=") + .append(alive) + .append(", locked=") + .append(", modelClassLoaderHash=") + .append(modelClassLoaderHash) + .append("]"); + return builder.toString(); + } } -- cgit 1.2.3-korg