aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java10
-rw-r--r--policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java275
-rw-r--r--policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java10
3 files changed, 217 insertions, 78 deletions
diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java
index fca30e00..26a80558 100644
--- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java
+++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java
@@ -454,7 +454,15 @@ public class PolicyContainer implements Startable
}else {
logger.info("updateToVersion:releaseId " + releaseId.toString());
}
- return(kieContainer.updateToVersion(releaseId));
+
+ // notify all 'PolicySession' instances
+ Results results = kieContainer.updateToVersion(releaseId);
+ for (PolicySession session : sessions.values())
+ {
+ session.updated();
+ }
+
+ return(results);
}
/**
diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java
index ae9dbc45..4d717f5d 100644
--- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java
+++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java
@@ -67,8 +67,8 @@ public class PolicySession
// associated 'KieSession' instance
private KieSession kieSession;
- // if not 'null', this is the thread running 'kieSession.fireUntilHalt()'
- private Thread thread = null;
+ // if not 'null', this is the thread model processing the 'KieSession'
+ private ThreadModel threadModel = null;
// supports 'getCurrentSession()' method
static private ThreadLocal<PolicySession> policySession =
@@ -127,100 +127,77 @@ public class PolicySession
}
/**
- * this starts a separate thread, which invokes 'KieSession.fireUntilHalt()'.
- * It does nothing if the thread already exists.
+ * If no 'ThreadModel' is currently running, this method will create one,
+ * and invoke it's 'start()' method. Features implementing
+ * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
+ * the ThreadModel instance.
*/
public synchronized void startThread()
{
- if (thread == null)
+ if (threadModel == null)
{
- logger.info("startThread with name " + getFullName());
- thread = new Thread("Session " + getFullName())
+ // loop through all of the features, and give each one
+ // a chance to create the 'ThreadModel'
+ for (PolicySessionFeatureAPI feature :
+ PolicySessionFeatureAPI.impl.getList())
{
- public void run()
+ try
{
- // set thread local variable
- policySession.set(PolicySession.this);
-
- // We want to continue, despite any exceptions that occur
- // while rules are fired.
- boolean repeat = true;
- long minSleepTime = 100;
- long maxSleepTime = 5000;
- long sleepTime = maxSleepTime;
- while (repeat)
- {
- if(this.isInterrupted()){
- break;
- }
- try
- {
- if (kieSession.fireAllRules() > 0)
- {
- // some rules fired -- reduce poll delay
- if (sleepTime > minSleepTime)
- {
- sleepTime /= 2;
- if (sleepTime < minSleepTime)
- {
- sleepTime = minSleepTime;
- }
- }
- }
- else
- {
- // no rules fired -- increase poll delay
- if (sleepTime < maxSleepTime)
- {
- sleepTime *= 2;
- if (sleepTime > maxSleepTime)
- {
- sleepTime = maxSleepTime;
- }
- }
- }
- }
- catch (Throwable e)
- {
- logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt");
- }
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- break;
- }
- }
- logger.info("fireUntilHalt() returned");
+ if ((threadModel = feature.selectThreadModel(this)) != null)
+ break;
}
- };
- thread.start();
+ catch (Exception e)
+ {
+ logger.error("ERROR: Feature API: "
+ + feature.getClass().getName(), e);
+ }
+ }
+ if (threadModel == null)
+ {
+ // no feature created a ThreadModel -- select the default
+ threadModel = new DefaultThreadModel(this);
+ }
+ logger.info("starting ThreadModel for session " + getFullName());
+ threadModel.start();
}
}
/**
- * if a thread is currently running, this invokes 'KieSession.halt()' to
- * stop it.
+ * If a 'ThreadModel' is currently running, this calls the 'stop()' method,
+ * and sets the 'threadModel' reference to 'null'.
*/
public synchronized void stopThread()
{
- if (thread != null)
+ if (threadModel != null)
{
- // this should cause the thread to exit
- thread.interrupt();
- try
- {
- // wait for the thread to stop
- thread.join();
- }
- catch (Exception e)
- {
- logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join");
- }
- thread = null;
+ threadModel.stop();
+ threadModel = null;
}
}
/**
+ * Notification that 'updateToVersion' was called on the container
+ */
+ void updated()
+ {
+ if (threadModel != null)
+ {
+ // notify the 'ThreadModel', which may change one or more Thread names
+ threadModel.updated();
+ }
+ }
+
+ /**
+ * Set this 'PolicySession' instance as the one associated with the
+ * currently-running thread.
+ */
+ public void setPolicySession()
+ {
+ // this sets a 'ThreadLocal' variable
+ policySession.set(this);
+ }
+
+ /**
* @return the 'PolicySession' instance associated with the current thread
* (Note that this only works if the current thread is the one running
* 'kieSession.fireUntilHalt()'.)
@@ -452,4 +429,148 @@ public class PolicySession
+ ": AgendaEventListener.objectUpdated(" + event + ")");
}
}
+
+ /* ============================================================ */
+
+ /**
+ * This interface helps support the ability for features to choose the
+ * thread or threads that processes the 'KieSession'.
+ */
+ public interface ThreadModel
+ {
+ /**
+ * Start the thread or threads that do the 'KieSession' processing
+ */
+ public void start();
+
+ /**
+ * Stop the thread or threads that do the 'KieSession' processing
+ */
+ public void stop();
+
+ /**
+ * This method is called to notify the running session that
+ * 'KieContainer.updateToVersion(...)' has been called (meaning the
+ * full name of this session has changed).
+ */
+ default public void updated() {}
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
+ */
+ public static class DefaultThreadModel implements Runnable,ThreadModel
+ {
+ // session associated with this persistent thread
+ PolicySession session;
+
+ // the session thread
+ Thread thread;
+
+ // controls whether the thread loops or terminates
+ volatile boolean repeat = true;
+
+ /**
+ * Constructor - initialize 'session' and create thread
+ *
+ * @param session the 'PolicySession' instance
+ */
+ public DefaultThreadModel(PolicySession session)
+ {
+ this.session = session;
+ thread = new Thread(this,getThreadName());
+ }
+
+ /**
+ * @return the String to use as the thread name
+ */
+ private String getThreadName()
+ {
+ return("Session " + session.getFullName());
+ }
+
+ /***************************/
+ /* 'ThreadModel' interface */
+ /***************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start()
+ {
+ repeat = true;
+ thread.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop()
+ {
+ repeat = false;
+
+ // this should cause the thread to exit
+ session.getKieSession().halt();
+ try
+ {
+ // wait up to 10 seconds for the thread to stop
+ thread.join(10000);
+
+ // one more interrupt, just in case the 'kieSession.halt()'
+ // didn't work for some reason
+ thread.interrupt();
+ }
+ catch (Exception e)
+ {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void updated()
+ {
+ // the container artifact has been updated -- adjust the thread name
+ thread.setName(getThreadName());
+ }
+
+ /************************/
+ /* 'Runnable' interface */
+ /************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run()
+ {
+ // set thread local variable
+ session.setPolicySession();
+
+ // We want to continue looping, despite any exceptions that occur
+ // while rules are fired.
+ KieSession kieSession = session.getKieSession();
+ while (repeat)
+ {
+ try
+ {
+ kieSession.fireUntilHalt();
+
+ // if we fall through, it means 'KieSession.halt()' was called
+ repeat = false;
+ }
+ catch (Throwable e)
+ {
+ logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt");
+ }
+ }
+ logger.info("fireUntilHalt() returned");
+ }
+ }
}
diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java
index da828db3..8792372b 100644
--- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java
+++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java
@@ -80,6 +80,16 @@ public interface PolicySessionFeatureAPI extends OrderedService
default public void newPolicySession(PolicySession policySession) {}
/**
+ * This method is called to select the 'ThreadModel' instance associated
+ * with a 'PolicySession' instance.
+ */
+ default public PolicySession.ThreadModel selectThreadModel
+ (PolicySession session)
+ {
+ return(null);
+ }
+
+ /**
* This method is called after 'KieSession.dispose()' is called
*
* @param policySession the 'PolicySession' object that wrapped the