aboutsummaryrefslogtreecommitdiffstats
path: root/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java
diff options
context:
space:
mode:
authorGuo Ruijing <ruijing.guo@intel.com>2017-07-28 08:23:01 +0000
committerGuo Ruijing <ruijing.guo@intel.com>2017-07-28 08:23:30 +0000
commit6abeb297254942c48722c2da0e7c355d523fe307 (patch)
treef2b006ec6ca8804633e2f74a6f1b40c90683f1ea /policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java
parentd1d749ae390c276fc10c4985d0080f0a9ff7ff35 (diff)
[POLICY-72] replace openecomp for drools-pdp
Change-Id: I8aa8e32d3ba10f7c655b50e97aaf6865514d4777 Signed-off-by: Guo Ruijing <ruijing.guo@intel.com>
Diffstat (limited to 'policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java')
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java576
1 files changed, 576 insertions, 0 deletions
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java
new file mode 100644
index 00000000..77e16804
--- /dev/null
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java
@@ -0,0 +1,576 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-core
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.kie.api.event.rule.AfterMatchFiredEvent;
+import org.kie.api.event.rule.AgendaEventListener;
+import org.kie.api.event.rule.AgendaGroupPoppedEvent;
+import org.kie.api.event.rule.AgendaGroupPushedEvent;
+import org.kie.api.event.rule.BeforeMatchFiredEvent;
+import org.kie.api.event.rule.MatchCancelledEvent;
+import org.kie.api.event.rule.MatchCreatedEvent;
+import org.kie.api.event.rule.ObjectDeletedEvent;
+import org.kie.api.event.rule.ObjectInsertedEvent;
+import org.kie.api.event.rule.ObjectUpdatedEvent;
+import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
+import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
+import org.kie.api.event.rule.RuleRuntimeEventListener;
+import org.kie.api.runtime.KieSession;
+import org.onap.policy.drools.core.jmx.PdpJmx;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This class is a wrapper around 'KieSession', which adds the following:
+ *
+ * 1) A thread running 'KieSession.fireUntilHalt()'
+ * 2) Access to UEB
+ * 3) Logging of events
+ */
+public class PolicySession
+ implements AgendaEventListener, RuleRuntimeEventListener
+{
+ // get an instance of logger
+ private static Logger logger = LoggerFactory.getLogger(PolicySession.class);
+ // name of the 'PolicySession' and associated 'KieSession'
+ private String name;
+
+ // the associated 'PolicyContainer', which may have additional
+ // 'PolicySession' instances in addition to this one
+ private PolicyContainer container;
+
+ // maps feature objects to per-PolicyContainer data
+ private ConcurrentHashMap<Object, Object> adjuncts =
+ new ConcurrentHashMap<Object, Object>();
+
+ // associated 'KieSession' instance
+ private KieSession kieSession;
+
+ // if not 'null', this is the thread model processing the 'KieSession'
+ private ThreadModel threadModel = null;
+
+ // supports 'getCurrentSession()' method
+ static private ThreadLocal<PolicySession> policySession =
+ new ThreadLocal<PolicySession>();
+
+ /**
+ * Internal constructor - create a 'PolicySession' instance
+ *
+ * @param name the name of this 'PolicySession' (and 'kieSession')
+ * @param container the 'PolicyContainer' instance containing this session
+ * @param kieSession the associated 'KieSession' instance
+ */
+ protected PolicySession(String name,
+ PolicyContainer container, KieSession kieSession)
+ {
+ this.name = name;
+ this.container = container;
+ this.kieSession = kieSession;
+ kieSession.addEventListener((AgendaEventListener)this);
+ kieSession.addEventListener((RuleRuntimeEventListener)this);
+ }
+
+ /**
+ * @return the 'PolicyContainer' object containing this session
+ */
+ public PolicyContainer getPolicyContainer()
+ {
+ return(container);
+ }
+
+ /**
+ * @return the associated 'KieSession' instance
+ */
+ public KieSession getKieSession()
+ {
+ return(kieSession);
+ }
+
+ /**
+ * @return the local name of this session, which should either match the
+ * name specified in 'kmodule.xml' file associated with this session, or the
+ * name passed on the 'PolicyContainer.adoptKieSession' method.
+ */
+ public String getName()
+ {
+ return(name);
+ }
+
+ /**
+ * @return the 'PolicyContainer' name, followed by ':', followed by the
+ * local name of the session. It should be useful in log messages.
+ */
+ public String getFullName()
+ {
+ return(container.getName() + ":" + name);
+ }
+
+ /**
+ * If no 'ThreadModel' is currently running, this method will create one,
+ * and invoke it's 'start()' method. Features implementing
+ * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
+ * the ThreadModel instance.
+ */
+ public synchronized void startThread()
+ {
+ if (threadModel == null)
+ {
+ // loop through all of the features, and give each one
+ // a chance to create the 'ThreadModel'
+ for (PolicySessionFeatureAPI feature :
+ PolicySessionFeatureAPI.impl.getList())
+ {
+ try
+ {
+ if ((threadModel = feature.selectThreadModel(this)) != null)
+ break;
+ }
+ catch (Exception e)
+ {
+ logger.error("ERROR: Feature API: "
+ + feature.getClass().getName(), e);
+ }
+ }
+ if (threadModel == null)
+ {
+ // no feature created a ThreadModel -- select the default
+ threadModel = new DefaultThreadModel(this);
+ }
+ logger.info("starting ThreadModel for session " + getFullName());
+ threadModel.start();
+ }
+ }
+
+ /**
+ * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
+ * and sets the 'threadModel' reference to 'null'.
+ */
+ public synchronized void stopThread()
+ {
+ if (threadModel != null)
+ {
+ threadModel.stop();
+ threadModel = null;
+ }
+ }
+
+ /**
+ * Notification that 'updateToVersion' was called on the container
+ */
+ void updated()
+ {
+ if (threadModel != null)
+ {
+ // notify the 'ThreadModel', which may change one or more Thread names
+ threadModel.updated();
+ }
+ }
+
+ /**
+ * Set this 'PolicySession' instance as the one associated with the
+ * currently-running thread.
+ */
+ public void setPolicySession()
+ {
+ // this sets a 'ThreadLocal' variable
+ policySession.set(this);
+ }
+
+ /**
+ * @return the 'PolicySession' instance associated with the current thread
+ * (Note that this only works if the current thread is the one running
+ * 'kieSession.fireUntilHalt()'.)
+ */
+ public static PolicySession getCurrentSession()
+ {
+ return(policySession.get());
+ }
+
+ /**
+ * Fetch the adjunct object associated with a given feature
+ *
+ * @param object this is typically the singleton feature object that is
+ * used as a key, but it might also be useful to use nested objects
+ * within the feature as keys.
+ * @return a feature-specific object associated with the key, or 'null'
+ * if it is not found.
+ */
+ public Object getAdjunct(Object object)
+ {
+ return(adjuncts.get(object));
+ }
+
+ /**
+ * Store the adjunct object associated with a given feature
+ *
+ * @param object this is typically the singleton feature object that is
+ * used as a key, but it might also be useful to use nested objects
+ * within the feature as keys.
+ * @param value a feature-specific object associated with the key, or 'null'
+ * if the feature-specific object should be removed
+ */
+ public void setAdjunct(Object object, Object value)
+ {
+ if (value == null)
+ {
+ adjuncts.remove(object);
+ }
+ else
+ {
+ adjuncts.put(object, value);
+ }
+ }
+
+ /***********************************/
+ /* 'AgendaEventListener' interface */
+ /***********************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void afterMatchFired(AfterMatchFiredEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("afterMatchFired: " + getFullName()
+ + ": AgendaEventListener.afterMatchFired(" + event + ")");
+ }
+ PdpJmx.getInstance().ruleFired();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("afterRuleFlowGroupActivated: " + getFullName()
+ + ": AgendaEventListener.afterRuleFlowGroupActivated("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void afterRuleFlowGroupDeactivated
+ (RuleFlowGroupDeactivatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("afterRuleFlowGroupDeactivated: " + getFullName()
+ + ": AgendaEventListener.afterRuleFlowGroupDeactivated("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void agendaGroupPopped(AgendaGroupPoppedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("agendaGroupPopped: " + getFullName()
+ + ": AgendaEventListener.agendaGroupPopped("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void agendaGroupPushed(AgendaGroupPushedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("agendaGroupPushed: " + getFullName()
+ + ": AgendaEventListener.agendaGroupPushed("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beforeMatchFired(BeforeMatchFiredEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("beforeMatchFired: " + getFullName()
+ + ": AgendaEventListener.beforeMatchFired("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beforeRuleFlowGroupActivated
+ (RuleFlowGroupActivatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("beforeRuleFlowGroupActivated: " + getFullName()
+ + ": AgendaEventListener.beforeRuleFlowGroupActivated("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beforeRuleFlowGroupDeactivated
+ (RuleFlowGroupDeactivatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName()
+ + ": AgendaEventListener.beforeRuleFlowGroupDeactivated("
+ + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void matchCancelled(MatchCancelledEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("matchCancelled: " + getFullName()
+ + ": AgendaEventListener.matchCancelled(" + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void matchCreated(MatchCreatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("matchCreated: " + getFullName()
+ + ": AgendaEventListener.matchCreated(" + event + ")");
+ }
+ }
+
+ /****************************************/
+ /* 'RuleRuntimeEventListener' interface */
+ /****************************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void objectDeleted(ObjectDeletedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("objectDeleted: " + getFullName()
+ + ": AgendaEventListener.objectDeleted(" + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void objectInserted(ObjectInsertedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("objectInserted: " + getFullName()
+ + ": AgendaEventListener.objectInserted(" + event + ")");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void objectUpdated(ObjectUpdatedEvent event)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("objectUpdated: " + getFullName()
+ + ": AgendaEventListener.objectUpdated(" + event + ")");
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This interface helps support the ability for features to choose the
+ * thread or threads that processes the 'KieSession'.
+ */
+ public interface ThreadModel
+ {
+ /**
+ * Start the thread or threads that do the 'KieSession' processing
+ */
+ public void start();
+
+ /**
+ * Stop the thread or threads that do the 'KieSession' processing
+ */
+ public void stop();
+
+ /**
+ * This method is called to notify the running session that
+ * 'KieContainer.updateToVersion(...)' has been called (meaning the
+ * full name of this session has changed).
+ */
+ default public void updated() {}
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
+ */
+ public static class DefaultThreadModel implements Runnable,ThreadModel
+ {
+ // session associated with this persistent thread
+ PolicySession session;
+
+ // the session thread
+ Thread thread;
+
+ // controls whether the thread loops or terminates
+ volatile boolean repeat = true;
+
+ /**
+ * Constructor - initialize 'session' and create thread
+ *
+ * @param session the 'PolicySession' instance
+ */
+ public DefaultThreadModel(PolicySession session)
+ {
+ this.session = session;
+ thread = new Thread(this,getThreadName());
+ }
+
+ /**
+ * @return the String to use as the thread name
+ */
+ private String getThreadName()
+ {
+ return("Session " + session.getFullName());
+ }
+
+ /***************************/
+ /* 'ThreadModel' interface */
+ /***************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start()
+ {
+ repeat = true;
+ thread.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop()
+ {
+ repeat = false;
+
+ // this should cause the thread to exit
+ session.getKieSession().halt();
+ try
+ {
+ // wait up to 10 seconds for the thread to stop
+ thread.join(10000);
+
+ // one more interrupt, just in case the 'kieSession.halt()'
+ // didn't work for some reason
+ thread.interrupt();
+ }
+ catch (Exception e)
+ {
+ logger.error("stopThread in thread.join error");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updated()
+ {
+ // the container artifact has been updated -- adjust the thread name
+ thread.setName(getThreadName());
+ }
+
+ /************************/
+ /* 'Runnable' interface */
+ /************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // set thread local variable
+ session.setPolicySession();
+
+ // We want to continue looping, despite any exceptions that occur
+ // while rules are fired.
+ KieSession kieSession = session.getKieSession();
+ while (repeat)
+ {
+ try
+ {
+ kieSession.fireUntilHalt();
+
+ // if we fall through, it means 'KieSession.halt()' was called,
+ // but this may be a result of 'KieScanner' doing an update
+ }
+ catch (Exception | LinkageError e)
+ {
+ logger.error("startThread error in kieSession.fireUntilHalt", e);
+ }
+ }
+ logger.info("fireUntilHalt() returned");
+ }
+ }
+}