diff options
author | 2017-07-28 08:23:01 +0000 | |
---|---|---|
committer | 2017-07-28 08:23:30 +0000 | |
commit | 6abeb297254942c48722c2da0e7c355d523fe307 (patch) | |
tree | f2b006ec6ca8804633e2f74a6f1b40c90683f1ea /policy-core/src/main/java/org/onap/policy/drools/core | |
parent | d1d749ae390c276fc10c4985d0080f0a9ff7ff35 (diff) |
[POLICY-72] replace openecomp for drools-pdp
Change-Id: I8aa8e32d3ba10f7c655b50e97aaf6865514d4777
Signed-off-by: Guo Ruijing <ruijing.guo@intel.com>
Diffstat (limited to 'policy-core/src/main/java/org/onap/policy/drools/core')
6 files changed, 1672 insertions, 0 deletions
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java new file mode 100644 index 00000000..cf94bfcb --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java @@ -0,0 +1,848 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.api.KieBase; +import org.kie.api.KieServices; +import org.kie.api.builder.KieScanner; +import org.kie.api.builder.Message; +import org.kie.api.builder.ReleaseId; +import org.kie.api.builder.Results; +import org.kie.api.runtime.KieContainer; +import org.kie.api.runtime.KieSession; +import org.onap.policy.drools.properties.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is a wrapper around 'KieContainer', which adds the ability + * to automatically create and track KieSession instances. + */ +public class PolicyContainer implements Startable +{ + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(PolicyContainer.class); + // 'KieServices' singleton + static private KieServices kieServices = KieServices.Factory.get(); + + // set of all 'PolicyContainer' instances + static private HashSet<PolicyContainer> containers = + new HashSet<PolicyContainer>(); + + // maps feature objects to per-PolicyContainer data + private ConcurrentHashMap<Object, Object> adjuncts = + new ConcurrentHashMap<Object, Object>(); + + // 'KieContainer' associated with this 'PolicyContainer' + private KieContainer kieContainer; + + // indicates whether the PolicyContainer is 'started' + // (started = sessions created, threads running) + private volatile boolean isStarted = false; + + // maps session name into the associated 'PolicySession' instance + private HashMap<String, PolicySession> sessions = + new HashMap<String, PolicySession>(); + + // if not null, this is a 'KieScanner' looking for updates + private KieScanner scanner = null; + + // indicates whether the scanner has been started + // (it can block for a long time) + private boolean scannerStarted = false; + + /** + * uses 'groupId', 'artifactId' and 'version', and fetches the associated + * artifact and remaining dependencies from the Maven repository to create + * the 'PolicyContainer' and associated 'KieContainer'. + * + * An exception occurs if the creation of the 'KieContainer' fails. + * + * @param groupId the 'groupId' associated with the artifact + * @param artifactId the artifact name + * @param version a comma-separated list of possible versions + */ + public PolicyContainer(String groupId, String artifactId, String version) + { + this(kieServices.newReleaseId(groupId, artifactId, version)); + } + + /** + * uses the 'groupId', 'artifactId' and 'version' information in 'ReleaseId', + * and fetches the associated artifact and remaining dependencies from the + * Maven repository to create the 'PolicyContainer' and associated + * 'KieContainer'. + * + * An exception occurs if the creation of the 'KieContainer' fails. + * + * @param releaseId indicates the artifact that is to be installed in this + * container + */ + public PolicyContainer(ReleaseId releaseId) + { + if (releaseId.getVersion().contains(",")) + { + // this is actually a comma-separated list of release ids + releaseId = loadArtifact(releaseId.getGroupId(), + releaseId.getArtifactId(), + releaseId.getVersion()); + } + else + { + kieContainer = kieServices.newKieContainer(releaseId); + } + synchronized(containers) + { + if(releaseId != null){ + logger.info("Add a new kieContainer in containers: releaseId: " + releaseId.toString()); + }else{ + logger.warn("input releaseId is null"); + } + containers.add(this); + } + // 'startScanner(releaseId)' was called at this point, but we have seen + // at least one case where the Drools container was repeatedly updated + // every 60 seconds. It isn't clear what conditions resulted in this + // behavior, so the call was removed. If needed, it can be explicitly + // called from a feature. + } + + /** + * Load an artifact into a new KieContainer. This method handles the + * case where the 'version' is actually a comma-separated list of + * versions. + * + * @param groupId the 'groupId' associated with the artifact + * @param artifactId the artifact name + * @param version a comma-separated list of possible versions + */ + private ReleaseId loadArtifact + (String groupId, String artifactId, String version) + { + String[] versions = version.split(","); + if (versions.length > 1) + { + logger.info("Multiple KieContainer versions are specified: " + + version); + } + + // indicates a 'newKieContainer' call failed + RuntimeException exception = null; + + // set prior to every 'newKieContainer' invocation + // (if we are able to create the container, it will be the last + // one that was successful) + ReleaseId releaseId = null; + for (String ver : versions) + { + try + { + // Create a 'ReleaseId' object describing the artifact, and + // create a 'KieContainer' based upon it. + logger.info("Create new KieContainer start, version = " + + ver + " ..."); + + releaseId = kieServices.newReleaseId(groupId, artifactId, ver); + kieContainer = kieServices.newKieContainer(releaseId); + + // clear any exception, and break out of the loop + exception = null; + break; + } + catch (RuntimeException e) + { + exception = e; + } + } + if (exception != null) + { + // all of the 'newKieContainer' invocations failed -- throw the + // most recent exception + throw(exception); + } + return(releaseId); + } + + /** + * @return the name of the container, which is the String equivalent of + * the 'ReleaseId'. It has the form: + * + * (groupId + ":" + artifactId + ":" + version) + * + * Note that the name changes after a successful call to 'updateToVersion', + * although typically only the 'version' part changes. + */ + public String getName() + { + return(kieContainer.getReleaseId().toString()); + } + + /** + * @return the associated 'KieContainer' instance + */ + public KieContainer getKieContainer() + { + return(kieContainer); + } + + /** + * @return the 'ClassLoader' associated with the 'KieContainer' instance + */ + public ClassLoader getClassLoader() + { + return(kieContainer.getClassLoader()); + } + + /** + * @return the Maven GroupId of the top-level artifact wrapped + * by the container. + */ + public String getGroupId() + { + return(kieContainer.getReleaseId().getGroupId()); + } + + /** + * @return the Maven ArtifactId of the top-level artifact wrapped + * by the container. + */ + public String getArtifactId() + { + return(kieContainer.getReleaseId().getArtifactId()); + } + + /** + * @return the version of the top-level artifact wrapped by the + * container (this may change as updates occur) + */ + public String getVersion() + { + return(kieContainer.getReleaseId().getVersion()); + } + + /** + * Fetch the named 'PolicySession'. + * + * @param name the name of the KieSession (which is also the name of + * the associated PolicySession) + * @return a PolicySession if found, 'null' if not + */ + public PolicySession getPolicySession(String name) + { + return(sessions.get(name)); + } + + /** + * Internal method to create a PolicySession, possibly restoring it + * from persistent storage. + * + * @param name of the KieSession and PolicySession + * @param kieBaseName name of the associated 'KieBase' instance + * @return a new or existing PolicySession, or 'null' if not found + */ + private PolicySession activatePolicySession(String name, String kieBaseName) + { + synchronized(sessions) + { + logger.info("activatePolicySession:name :" + name); + PolicySession session = sessions.get(name); + if (session == null) + { + KieSession kieSession = null; + + // loop through all of the features, and give each one + // a chance to create the 'KieSession' + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + if ((kieSession = feature.activatePolicySession + (this, name, kieBaseName)) != null) + break; + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + + // if none of the features created the session, create one now + if (kieSession == null) + { + kieSession = kieContainer.newKieSession(name); + } + + if (kieSession != null) + { + // creation of 'KieSession' was successful - build + // a PolicySession + session = new PolicySession(name, this, kieSession); + sessions.put(name, session); + + // notify features + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + feature.newPolicySession(session); + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + logger.info("activatePolicySession:new session was added in sessions with name " + name); + } + } + logger.info("activatePolicySession:session - " + + (session == null ? "null" : session.getFullName()) + + " is returned."); + return(session); + } + } + + /** + * This creates a 'PolicySession' instance within this 'PolicyContainer', + * and ties it to the specified 'KieSession'. 'name' must not currently + * exist within the 'PolicyContainer', and the 'KieBase' object associated + * with 'KieSession' must belong to the 'KieContainer'. This method provides + * a way for 'KieSession' instances that are created programmatically to fit + * into this framework. + * + * @param name the name for the new 'PolicySession' + * @param kieSession a 'KieSession' instance, that will be included in + * this infrastructure + * @return the new 'PolicySession' + * @throws IllegalArgumentException if 'kieSession' does not reside within + * this container + * @throws IllegalStateException if a 'PolicySession' already exists + * with this name + */ + public PolicySession adoptKieSession(String name, KieSession kieSession) + throws IllegalArgumentException, IllegalStateException + { + + if(name == null){ + logger.warn("adoptKieSession:input name is null"); + throw(new IllegalArgumentException + ("KieSession input name is null " + + getName())); + }else if(kieSession == null){ + logger.warn("adoptKieSession:input kieSession is null"); + throw(new IllegalArgumentException + ("KieSession '" + name + "' is null " + + getName())); + }else { + logger.info("adoptKieSession:name: " + name + " kieSession: " + kieSession); + } + // fetch KieBase, and verify it belongs to this KieContainer + boolean match = false; + KieBase kieBase = kieSession.getKieBase(); + logger.info("adoptKieSession:kieBase: " + kieBase); + for (String kieBaseName : kieContainer.getKieBaseNames()) + { + logger.info("adoptKieSession:kieBaseName: " + kieBaseName); + if (kieBase == kieContainer.getKieBase(kieBaseName)) + { + match = true; + break; + } + } + logger.info("adoptKieSession:match " + match); + // if we don't have a match yet, the last chance is to look at the + // default KieBase, if it exists + if (!match && kieBase != kieContainer.getKieBase()) + { + throw(new IllegalArgumentException + ("KieSession '" + name + "' does not reside within container " + + getName())); + } + + synchronized (sessions) + { + if (sessions.get(name) != null) + { + throw(new IllegalStateException + ("PolicySession '" + name + "' already exists")); + } + + // create the new 'PolicySession', add it to the table, + // and return the object to the caller + logger.info("adoptKieSession:create a new policySession with name " + name); + PolicySession policySession = + new PolicySession(name, this, kieSession); + sessions.put(name, policySession); + + // notify features + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + feature.newPolicySession(policySession); + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + return(policySession); + } + } + + /** + * This call 'KieContainer.updateToVersion()', and returns the associated + * response as a String. If successful, the name of this 'PolicyContainer' + * changes to match the new version. + * + * @param newVersion this is the version to update to (the 'groupId' + * and 'artifactId' remain the same) + * @return the list of messages associated with the update (not sure if + * this can be 'null', or how to determine success/failure) + */ + public String updateToVersion(String newVersion) + { + ReleaseId releaseId = kieContainer.getReleaseId(); + Results results = this.updateToVersion + (kieServices.newReleaseId(releaseId.getGroupId(), + releaseId.getArtifactId(), + newVersion)); + + List<Message> messages = (results == null ? null : results.getMessages()); + return(messages == null ? null : messages.toString()); + } + + /** + * This calls 'KieContainer.updateToVersion()', and returns the associated + * response. If successful, the name of this 'PolicyContainer' changes to + * match the new version. + * + * @param releaseId the new artifact (usually new version) to be installed + * @return the 'Results' parameter from 'KieContainer.updateToVersion' + */ + public Results updateToVersion(ReleaseId releaseId) + { + if(releaseId == null){ + logger.warn("updateToVersion:input releaseId is null"); + }else { + logger.info("updateToVersion:releaseId " + releaseId.toString()); + } + + // notify all 'PolicySession' instances + Results results = kieContainer.updateToVersion(releaseId); + for (PolicySession session : sessions.values()) + { + session.updated(); + } + + return(results); + } + + /** + * @return all existing 'PolicyContainer' instances + */ + public static Collection<PolicyContainer> getPolicyContainers() + { + synchronized(containers) + { + return(new HashSet<PolicyContainer>(containers)); + } + } + + /** + * @return all of the 'PolicySession' instances + */ + public Collection<PolicySession> getPolicySessions() + { + // KLUDGE WARNING: this is a temporary workaround -- if there are + // no features, we don't have persistence, and 'activate' is never + // called. In this case, make sure the container is started. + if (PolicySessionFeatureAPI.impl.getList().size() == 0) + { + start(); + } + + // return current set of PolicySessions + synchronized(sessions) + { + return(new HashSet<PolicySession>(sessions.values())); + } + } + + /** + * This method will start a 'KieScanner' (if not currently running), + * provided that the ReleaseId version is 'LATEST' or 'RELEASE', + * or refers to a SNAPSHOT version. + * + * @param releaseId the release id used to create the container + */ + public synchronized void startScanner(ReleaseId releaseId) + { + String version = releaseId.getVersion(); + if (scannerStarted == false && scanner == null && version != null + && (version.equals("LATEST") || version.equals("RELEASE") + || version.endsWith("-SNAPSHOT"))) + { + // create the scanner, and poll at 60 second intervals + try + { + scannerStarted = true; + + // start this in a separate thread -- it can block for a long time + new Thread("Scanner Starter " + getName()) + { + public void run() + { + scanner = kieServices.newKieScanner(kieContainer); + scanner.start(60000L); + } + }.start(); + } + catch (Exception e) + { + // sometimes the scanner initialization fails for some reason + logger.error("startScanner error", e); + } + } + } + + /** + * Insert a fact into a specific named session + * + * @param name this is the session name + * @param object this is the fact to be inserted into the session + * @return 'true' if the named session was found, 'false' if not + */ + public boolean insert(String name, Object object) + { + // TODO: Should the definition of 'name' be expanded to include an + // alternate entry point as well? For example, 'name.entryPoint' (or + // something other than '.' if that is a problem). + synchronized (sessions) + { + PolicySession session = sessions.get(name); + if (session != null) + { + session.getKieSession().insert(object); + return(true); + } + } + return(false); + } + + /** + * Insert a fact into all sessions associated with this container + * + * @param object this is the fact to be inserted into the sessions + * @return 'true' if the fact was inserted into at least one session, + * 'false' if not + */ + public boolean insertAll(Object object) + { + boolean rval = false; + synchronized (sessions) + { + for (PolicySession session : sessions.values()) + { + session.getKieSession().insert(object); + rval = true; + } + } + return(rval); + } + + /*************************/ + /* 'Startable' interface */ + /*************************/ + + /** + * {@inheritDoc} + */ + public synchronized boolean start() + { + if (!isStarted) + { + // This will create all 'PolicySession' instances specified in the + // 'kmodule.xml' file that don't exist yet + for (String kieBaseName : kieContainer.getKieBaseNames()) + { + for (String kieSessionName : + kieContainer.getKieSessionNamesInKieBase(kieBaseName)) + { + // if the 'PolicySession' does not currently exist, this method + // call will attempt to create it + PolicySession session = + activatePolicySession(kieSessionName, kieBaseName); + if (session != null) + { + session.startThread(); + } + } + } + isStarted = true; + } + return(true); + } + + /** + * {@inheritDoc} + */ + public synchronized boolean stop() + { + if (isStarted) + { + Collection<PolicySession> localSessions; + + synchronized (sessions) + { + // local set containing all of the sessions + localSessions = new HashSet<PolicySession>(sessions.values()); + + // clear the 'name->session' map in 'PolicyContainer' + sessions.clear(); + } + for (PolicySession session : localSessions) + { + // stop session thread + session.stopThread(); + + // free KieSession resources + session.getKieSession().dispose(); + + // notify features + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + feature.disposeKieSession(session); + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + } + isStarted = false; + } + return(true); + } + + /** + * {@inheritDoc} + */ + public synchronized void shutdown() + { + // Note that this method does not call 'destroy' on the 'KieSession' + // instances, which would remove any associated information in persistent + // storage. Should it do this? + + stop(); + synchronized(containers) + { + containers.remove(this); + } + + // How do we free the resources associated with the KieContainer? + // Is garbage collection sufficient? + } + + /** + * {@inheritDoc} + */ + public boolean isAlive() + { + return(isStarted); + } + + /*************************/ + + /** + * This method is similar to 'shutdown', but it also frees any persistence + * resources as well. + */ + public synchronized void destroy() + { + // we need all KieSession instances running in order to free + // resources associated with persistence + start(); + Collection<PolicySession> localSessions; + + synchronized (sessions) + { + // local set containing all of the sessions + localSessions = new HashSet<PolicySession>(sessions.values()); + + // clear the 'name->session' map in 'PolicyContainer' + sessions.clear(); + } + for (PolicySession session : localSessions) + { + // stop session thread + session.stopThread(); + + // free KieSession resources + session.getKieSession().destroy(); + + // notify features + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + feature.destroyKieSession(session); + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + } + isStarted = false; + + synchronized(containers) + { + containers.remove(this); + } + + // How do we free the resources associated with the KieContainer? + // Is garbage collection sufficient? + } + + /** + * This method is called when the host goes from the 'standby->active' state. + */ + static public void activate() + { + // start all of the 'PolicyContainer' instances + for (PolicyContainer container : containers) + { + try + { + container.start(); + } + catch (Exception e) + { + logger.error("PolicyContainer.start() error in activate", e); + } + } + } + + /** + * This method is called when the host goes from the 'active->standby' state. + */ + static public void deactivate() + { + // deactivate all of the 'PolicyContainer' instances + for (PolicyContainer container : containers) + { + try + { + container.stop(); + } + catch (Exception e) + { + logger.error("PolicyContainer.start() error in deactivate", e); + } + } + } + + /** + * This method does the following: + * + * 1) Initializes logging + * 2) Starts the DroolsPDP Integrity Monitor + * 3) Initilaizes persistence + * + * It no longer reads in properties files, o creates 'PolicyContainer' + * instances. + * + * @param args standard 'main' arguments, which are currently ignored + */ + public static void globalInit(String args[]) + { + String configDir = "config"; + logger.info("PolicyContainer.main: configDir=" + configDir); + + // invoke 'globalInit' on all of the features + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + feature.globalInit(args, configDir); + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + } + + /** + * Fetch the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @return a feature-specific object associated with the key, or 'null' + * if it is not found. + */ + public Object getAdjunct(Object object) + { + return(adjuncts.get(object)); + } + + /** + * Store the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @param value a feature-specific object associated with the key, or 'null' + * if the feature-specific object should be removed + */ + public void setAdjunct(Object object, Object value) + { + if (value == null) + { + adjuncts.remove(object); + } + else + { + adjuncts.put(object, value); + } + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java new file mode 100644 index 00000000..77e16804 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java @@ -0,0 +1,576 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core; + +import java.util.concurrent.ConcurrentHashMap; + +import org.kie.api.event.rule.AfterMatchFiredEvent; +import org.kie.api.event.rule.AgendaEventListener; +import org.kie.api.event.rule.AgendaGroupPoppedEvent; +import org.kie.api.event.rule.AgendaGroupPushedEvent; +import org.kie.api.event.rule.BeforeMatchFiredEvent; +import org.kie.api.event.rule.MatchCancelledEvent; +import org.kie.api.event.rule.MatchCreatedEvent; +import org.kie.api.event.rule.ObjectDeletedEvent; +import org.kie.api.event.rule.ObjectInsertedEvent; +import org.kie.api.event.rule.ObjectUpdatedEvent; +import org.kie.api.event.rule.RuleFlowGroupActivatedEvent; +import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent; +import org.kie.api.event.rule.RuleRuntimeEventListener; +import org.kie.api.runtime.KieSession; +import org.onap.policy.drools.core.jmx.PdpJmx; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is a wrapper around 'KieSession', which adds the following: + * + * 1) A thread running 'KieSession.fireUntilHalt()' + * 2) Access to UEB + * 3) Logging of events + */ +public class PolicySession + implements AgendaEventListener, RuleRuntimeEventListener +{ + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(PolicySession.class); + // name of the 'PolicySession' and associated 'KieSession' + private String name; + + // the associated 'PolicyContainer', which may have additional + // 'PolicySession' instances in addition to this one + private PolicyContainer container; + + // maps feature objects to per-PolicyContainer data + private ConcurrentHashMap<Object, Object> adjuncts = + new ConcurrentHashMap<Object, Object>(); + + // associated 'KieSession' instance + private KieSession kieSession; + + // if not 'null', this is the thread model processing the 'KieSession' + private ThreadModel threadModel = null; + + // supports 'getCurrentSession()' method + static private ThreadLocal<PolicySession> policySession = + new ThreadLocal<PolicySession>(); + + /** + * Internal constructor - create a 'PolicySession' instance + * + * @param name the name of this 'PolicySession' (and 'kieSession') + * @param container the 'PolicyContainer' instance containing this session + * @param kieSession the associated 'KieSession' instance + */ + protected PolicySession(String name, + PolicyContainer container, KieSession kieSession) + { + this.name = name; + this.container = container; + this.kieSession = kieSession; + kieSession.addEventListener((AgendaEventListener)this); + kieSession.addEventListener((RuleRuntimeEventListener)this); + } + + /** + * @return the 'PolicyContainer' object containing this session + */ + public PolicyContainer getPolicyContainer() + { + return(container); + } + + /** + * @return the associated 'KieSession' instance + */ + public KieSession getKieSession() + { + return(kieSession); + } + + /** + * @return the local name of this session, which should either match the + * name specified in 'kmodule.xml' file associated with this session, or the + * name passed on the 'PolicyContainer.adoptKieSession' method. + */ + public String getName() + { + return(name); + } + + /** + * @return the 'PolicyContainer' name, followed by ':', followed by the + * local name of the session. It should be useful in log messages. + */ + public String getFullName() + { + return(container.getName() + ":" + name); + } + + /** + * If no 'ThreadModel' is currently running, this method will create one, + * and invoke it's 'start()' method. Features implementing + * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create + * the ThreadModel instance. + */ + public synchronized void startThread() + { + if (threadModel == null) + { + // loop through all of the features, and give each one + // a chance to create the 'ThreadModel' + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) + { + try + { + if ((threadModel = feature.selectThreadModel(this)) != null) + break; + } + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + if (threadModel == null) + { + // no feature created a ThreadModel -- select the default + threadModel = new DefaultThreadModel(this); + } + logger.info("starting ThreadModel for session " + getFullName()); + threadModel.start(); + } + } + + /** + * If a 'ThreadModel' is currently running, this calls the 'stop()' method, + * and sets the 'threadModel' reference to 'null'. + */ + public synchronized void stopThread() + { + if (threadModel != null) + { + threadModel.stop(); + threadModel = null; + } + } + + /** + * Notification that 'updateToVersion' was called on the container + */ + void updated() + { + if (threadModel != null) + { + // notify the 'ThreadModel', which may change one or more Thread names + threadModel.updated(); + } + } + + /** + * Set this 'PolicySession' instance as the one associated with the + * currently-running thread. + */ + public void setPolicySession() + { + // this sets a 'ThreadLocal' variable + policySession.set(this); + } + + /** + * @return the 'PolicySession' instance associated with the current thread + * (Note that this only works if the current thread is the one running + * 'kieSession.fireUntilHalt()'.) + */ + public static PolicySession getCurrentSession() + { + return(policySession.get()); + } + + /** + * Fetch the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @return a feature-specific object associated with the key, or 'null' + * if it is not found. + */ + public Object getAdjunct(Object object) + { + return(adjuncts.get(object)); + } + + /** + * Store the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @param value a feature-specific object associated with the key, or 'null' + * if the feature-specific object should be removed + */ + public void setAdjunct(Object object, Object value) + { + if (value == null) + { + adjuncts.remove(object); + } + else + { + adjuncts.put(object, value); + } + } + + /***********************************/ + /* 'AgendaEventListener' interface */ + /***********************************/ + + /** + * {@inheritDoc} + */ + @Override + public void afterMatchFired(AfterMatchFiredEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("afterMatchFired: " + getFullName() + + ": AgendaEventListener.afterMatchFired(" + event + ")"); + } + PdpJmx.getInstance().ruleFired(); + } + + /** + * {@inheritDoc} + */ + @Override + public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("afterRuleFlowGroupActivated: " + getFullName() + + ": AgendaEventListener.afterRuleFlowGroupActivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void afterRuleFlowGroupDeactivated + (RuleFlowGroupDeactivatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("afterRuleFlowGroupDeactivated: " + getFullName() + + ": AgendaEventListener.afterRuleFlowGroupDeactivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void agendaGroupPopped(AgendaGroupPoppedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("agendaGroupPopped: " + getFullName() + + ": AgendaEventListener.agendaGroupPopped(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void agendaGroupPushed(AgendaGroupPushedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("agendaGroupPushed: " + getFullName() + + ": AgendaEventListener.agendaGroupPushed(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeMatchFired(BeforeMatchFiredEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("beforeMatchFired: " + getFullName() + + ": AgendaEventListener.beforeMatchFired(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeRuleFlowGroupActivated + (RuleFlowGroupActivatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("beforeRuleFlowGroupActivated: " + getFullName() + + ": AgendaEventListener.beforeRuleFlowGroupActivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeRuleFlowGroupDeactivated + (RuleFlowGroupDeactivatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName() + + ": AgendaEventListener.beforeRuleFlowGroupDeactivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void matchCancelled(MatchCancelledEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("matchCancelled: " + getFullName() + + ": AgendaEventListener.matchCancelled(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void matchCreated(MatchCreatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("matchCreated: " + getFullName() + + ": AgendaEventListener.matchCreated(" + event + ")"); + } + } + + /****************************************/ + /* 'RuleRuntimeEventListener' interface */ + /****************************************/ + + /** + * {@inheritDoc} + */ + @Override + public void objectDeleted(ObjectDeletedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("objectDeleted: " + getFullName() + + ": AgendaEventListener.objectDeleted(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void objectInserted(ObjectInsertedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("objectInserted: " + getFullName() + + ": AgendaEventListener.objectInserted(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void objectUpdated(ObjectUpdatedEvent event) + { + if (logger.isDebugEnabled()) + { + logger.debug("objectUpdated: " + getFullName() + + ": AgendaEventListener.objectUpdated(" + event + ")"); + } + } + + /* ============================================================ */ + + /** + * This interface helps support the ability for features to choose the + * thread or threads that processes the 'KieSession'. + */ + public interface ThreadModel + { + /** + * Start the thread or threads that do the 'KieSession' processing + */ + public void start(); + + /** + * Stop the thread or threads that do the 'KieSession' processing + */ + public void stop(); + + /** + * This method is called to notify the running session that + * 'KieContainer.updateToVersion(...)' has been called (meaning the + * full name of this session has changed). + */ + default public void updated() {} + } + + /* ============================================================ */ + + /** + * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'. + */ + public static class DefaultThreadModel implements Runnable,ThreadModel + { + // session associated with this persistent thread + PolicySession session; + + // the session thread + Thread thread; + + // controls whether the thread loops or terminates + volatile boolean repeat = true; + + /** + * Constructor - initialize 'session' and create thread + * + * @param session the 'PolicySession' instance + */ + public DefaultThreadModel(PolicySession session) + { + this.session = session; + thread = new Thread(this,getThreadName()); + } + + /** + * @return the String to use as the thread name + */ + private String getThreadName() + { + return("Session " + session.getFullName()); + } + + /***************************/ + /* 'ThreadModel' interface */ + /***************************/ + + /** + * {@inheritDoc} + */ + @Override + public void start() + { + repeat = true; + thread.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() + { + repeat = false; + + // this should cause the thread to exit + session.getKieSession().halt(); + try + { + // wait up to 10 seconds for the thread to stop + thread.join(10000); + + // one more interrupt, just in case the 'kieSession.halt()' + // didn't work for some reason + thread.interrupt(); + } + catch (Exception e) + { + logger.error("stopThread in thread.join error"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void updated() + { + // the container artifact has been updated -- adjust the thread name + thread.setName(getThreadName()); + } + + /************************/ + /* 'Runnable' interface */ + /************************/ + + /** + * {@inheritDoc} + */ + @Override + public void run() + { + // set thread local variable + session.setPolicySession(); + + // We want to continue looping, despite any exceptions that occur + // while rules are fired. + KieSession kieSession = session.getKieSession(); + while (repeat) + { + try + { + kieSession.fireUntilHalt(); + + // if we fall through, it means 'KieSession.halt()' was called, + // but this may be a result of 'KieScanner' doing an update + } + catch (Exception | LinkageError e) + { + logger.error("startThread error in kieSession.fireUntilHalt", e); + } + } + logger.info("fireUntilHalt() returned"); + } + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicySessionFeatureAPI.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySessionFeatureAPI.java new file mode 100644 index 00000000..6777eb59 --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySessionFeatureAPI.java @@ -0,0 +1,107 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core; + +import org.kie.api.runtime.KieSession; +import org.onap.policy.drools.utils.OrderedService; +import org.onap.policy.drools.utils.OrderedServiceImpl; + +/** + * This interface provides a way to invoke optional features at various + * points in the code. At appropriate points in the + * application, the code iterates through this list, invoking these optional + * methods. Most of the methods here are notification only -- these tend to + * return a 'void' value. In other cases, such as 'activatePolicySession', + * may + */ +public interface PolicySessionFeatureAPI extends OrderedService +{ + /** + * 'FeatureAPI.impl.getList()' returns an ordered list of objects + * implementing the 'FeatureAPI' interface. + */ + static public OrderedServiceImpl<PolicySessionFeatureAPI> impl = + new OrderedServiceImpl<PolicySessionFeatureAPI>(PolicySessionFeatureAPI.class); + + /** + * This method is called during initialization at a point right after + * 'PolicyContainer' initialization has completed. + * + * @param args standard 'main' arguments, which are currently ignored + * @param configDir the relative directory containing configuration files + */ + default public void globalInit(String args[], String configDir) {} + + /** + * This method is used to create a 'KieSession' as part of a + * 'PolicyContainer'. The caller of this method will iterate over the + * implementers of this interface until one returns a non-null value. + * + * @param policyContainer the 'PolicyContainer' instance containing this + * session + * @param name the name of the KieSession (which is also the name of + * the associated PolicySession) + * @param kieBaseName the name of the 'KieBase' instance containing + * this session + * @return a new KieSession, if one was created, or 'null' if not + * (this depends on the capabilities and state of the object implementing + * this interface) + */ + default public KieSession activatePolicySession + (PolicyContainer policyContainer, String name, String kieBaseName) + { + return(null); + } + + /** + * This method is called after a new 'PolicySession' has been initialized, + * and linked to the 'PolicyContainer'. + * + * @param policySession the new 'PolicySession' instance + */ + default public void newPolicySession(PolicySession policySession) {} + + /** + * This method is called to select the 'ThreadModel' instance associated + * with a 'PolicySession' instance. + */ + default public PolicySession.ThreadModel selectThreadModel + (PolicySession session) + { + return(null); + } + + /** + * This method is called after 'KieSession.dispose()' is called + * + * @param policySession the 'PolicySession' object that wrapped the + * 'KieSession' + */ + default public void disposeKieSession(PolicySession policySession) {} + + /** + * This method is called after 'KieSession.destroy()' is called + * + * @param policySession the 'PolicySession' object that wrapped the + * 'KieSession' + */ + default public void destroyKieSession(PolicySession policySession) {} +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmx.java b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmx.java new file mode 100644 index 00000000..d3cf2e9d --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmx.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.jmx; + +import java.util.concurrent.atomic.AtomicLong; + +public class PdpJmx implements PdpJmxMBean { + + private static PdpJmx instance = new PdpJmx(); + private final AtomicLong updates = new AtomicLong(); + private final AtomicLong actions = new AtomicLong(); + + public static PdpJmx getInstance() { + return instance; + } + + public long getUpdates(){ + return updates.longValue(); + } + public long getRulesFired(){ + return actions.longValue(); + } + public void updateOccured(){ + updates.incrementAndGet(); + } + public void ruleFired(){ + actions.incrementAndGet(); + } +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxListener.java b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxListener.java new file mode 100644 index 00000000..ceb7049e --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxListener.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.jmx; + +import java.lang.management.ManagementFactory; + +import javax.management.InstanceAlreadyExistsException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PdpJmxListener { + + public static final Logger logger = LoggerFactory.getLogger(PdpJmxListener.class); + + public static void stop() { + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + try { + server.unregisterMBean(new ObjectName("PolicyEngine:type=PdpJmx")); + } catch (MBeanRegistrationException | InstanceNotFoundException + | MalformedObjectNameException e) { + logger.error("PdpJmxListener.stop(): " + + "Could not unregister PolicyEngine:type=PdpJmx MBean " + + "with the MBean server", e); + } + + } + + + public static void start() { + final MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + try { + server.registerMBean(PdpJmx.getInstance(), new ObjectName("PolicyEngine:type=PdpJmx")); + } catch (InstanceAlreadyExistsException | MBeanRegistrationException + | NotCompliantMBeanException | MalformedObjectNameException e) { + logger.error("PdpJmxListener.start(): " + + "Could not unregister PolicyEngine:type=PdpJmx MBean " + + "with the MBean server", e); + } + + } + +} diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxMBean.java b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxMBean.java new file mode 100644 index 00000000..37a9e4dc --- /dev/null +++ b/policy-core/src/main/java/org/onap/policy/drools/core/jmx/PdpJmxMBean.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.core.jmx; + +public interface PdpJmxMBean { + + public long getRulesFired(); + public long getUpdates(); +} |