From 3cc1a85a832771cb70ed9cbaab9031bc4a114308 Mon Sep 17 00:00:00 2001 From: Ralph Straubs Date: Wed, 14 Jun 2017 03:56:05 -0500 Subject: [POLICY-21] PDP-D gets stuck during shutdown The fix for this problem is included in the following enhancements: 1) Define a new nested interface 'PolicySession.ThreadModel', which makes it possible for features to control the thread or threads processing a 'KieSession'. The nested class 'PolicySession.DefaultThreadModel' implements the default version, which uses 'KieSession.fireUntilHalt()' instead of polling 'KieSession.fireAllRules()'. A new method 'selectThreadModel(PolicySession session)' has been added to 'PolicySessionFeatureAPI' to enable this selection. 2) Update thread names when 'KieContainer.updateToVersion(...)' is called Change-Id: Ic48089fe5660501e2e3d42b87501697211a9d0fe Signed-off-by: Ralph Straubs --- .../policy/drools/core/PolicyContainer.java | 10 +- .../policy/drools/core/PolicySession.java | 275 +++++++++++++++------ .../drools/core/PolicySessionFeatureAPI.java | 10 + 3 files changed, 217 insertions(+), 78 deletions(-) (limited to 'policy-core/src') diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java index fca30e00..26a80558 100644 --- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java +++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicyContainer.java @@ -454,7 +454,15 @@ public class PolicyContainer implements Startable }else { logger.info("updateToVersion:releaseId " + releaseId.toString()); } - return(kieContainer.updateToVersion(releaseId)); + + // notify all 'PolicySession' instances + Results results = kieContainer.updateToVersion(releaseId); + for (PolicySession session : sessions.values()) + { + session.updated(); + } + + return(results); } /** diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java index ae9dbc45..4d717f5d 100644 --- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java +++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySession.java @@ -67,8 +67,8 @@ public class PolicySession // associated 'KieSession' instance private KieSession kieSession; - // if not 'null', this is the thread running 'kieSession.fireUntilHalt()' - private Thread thread = null; + // if not 'null', this is the thread model processing the 'KieSession' + private ThreadModel threadModel = null; // supports 'getCurrentSession()' method static private ThreadLocal policySession = @@ -127,99 +127,76 @@ public class PolicySession } /** - * this starts a separate thread, which invokes 'KieSession.fireUntilHalt()'. - * It does nothing if the thread already exists. + * If no 'ThreadModel' is currently running, this method will create one, + * and invoke it's 'start()' method. Features implementing + * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create + * the ThreadModel instance. */ public synchronized void startThread() { - if (thread == null) + if (threadModel == null) { - logger.info("startThread with name " + getFullName()); - thread = new Thread("Session " + getFullName()) + // loop through all of the features, and give each one + // a chance to create the 'ThreadModel' + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) { - public void run() + try { - // set thread local variable - policySession.set(PolicySession.this); - - // We want to continue, despite any exceptions that occur - // while rules are fired. - boolean repeat = true; - long minSleepTime = 100; - long maxSleepTime = 5000; - long sleepTime = maxSleepTime; - while (repeat) - { - if(this.isInterrupted()){ - break; - } - try - { - if (kieSession.fireAllRules() > 0) - { - // some rules fired -- reduce poll delay - if (sleepTime > minSleepTime) - { - sleepTime /= 2; - if (sleepTime < minSleepTime) - { - sleepTime = minSleepTime; - } - } - } - else - { - // no rules fired -- increase poll delay - if (sleepTime < maxSleepTime) - { - sleepTime *= 2; - if (sleepTime > maxSleepTime) - { - sleepTime = maxSleepTime; - } - } - } - } - catch (Throwable e) - { - logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt"); - } - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - break; - } - } - logger.info("fireUntilHalt() returned"); + if ((threadModel = feature.selectThreadModel(this)) != null) + break; } - }; - thread.start(); + catch (Exception e) + { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + if (threadModel == null) + { + // no feature created a ThreadModel -- select the default + threadModel = new DefaultThreadModel(this); + } + logger.info("starting ThreadModel for session " + getFullName()); + threadModel.start(); } } /** - * if a thread is currently running, this invokes 'KieSession.halt()' to - * stop it. + * If a 'ThreadModel' is currently running, this calls the 'stop()' method, + * and sets the 'threadModel' reference to 'null'. */ public synchronized void stopThread() { - if (thread != null) + if (threadModel != null) { - // this should cause the thread to exit - thread.interrupt(); - try - { - // wait for the thread to stop - thread.join(); - } - catch (Exception e) - { - logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join"); - } - thread = null; + threadModel.stop(); + threadModel = null; } } + /** + * Notification that 'updateToVersion' was called on the container + */ + void updated() + { + if (threadModel != null) + { + // notify the 'ThreadModel', which may change one or more Thread names + threadModel.updated(); + } + } + + /** + * Set this 'PolicySession' instance as the one associated with the + * currently-running thread. + */ + public void setPolicySession() + { + // this sets a 'ThreadLocal' variable + policySession.set(this); + } + /** * @return the 'PolicySession' instance associated with the current thread * (Note that this only works if the current thread is the one running @@ -452,4 +429,148 @@ public class PolicySession + ": AgendaEventListener.objectUpdated(" + event + ")"); } } + + /* ============================================================ */ + + /** + * This interface helps support the ability for features to choose the + * thread or threads that processes the 'KieSession'. + */ + public interface ThreadModel + { + /** + * Start the thread or threads that do the 'KieSession' processing + */ + public void start(); + + /** + * Stop the thread or threads that do the 'KieSession' processing + */ + public void stop(); + + /** + * This method is called to notify the running session that + * 'KieContainer.updateToVersion(...)' has been called (meaning the + * full name of this session has changed). + */ + default public void updated() {} + } + + /* ============================================================ */ + + /** + * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'. + */ + public static class DefaultThreadModel implements Runnable,ThreadModel + { + // session associated with this persistent thread + PolicySession session; + + // the session thread + Thread thread; + + // controls whether the thread loops or terminates + volatile boolean repeat = true; + + /** + * Constructor - initialize 'session' and create thread + * + * @param session the 'PolicySession' instance + */ + public DefaultThreadModel(PolicySession session) + { + this.session = session; + thread = new Thread(this,getThreadName()); + } + + /** + * @return the String to use as the thread name + */ + private String getThreadName() + { + return("Session " + session.getFullName()); + } + + /***************************/ + /* 'ThreadModel' interface */ + /***************************/ + + /** + * {@inheritDoc} + */ + @Override + public void start() + { + repeat = true; + thread.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() + { + repeat = false; + + // this should cause the thread to exit + session.getKieSession().halt(); + try + { + // wait up to 10 seconds for the thread to stop + thread.join(10000); + + // one more interrupt, just in case the 'kieSession.halt()' + // didn't work for some reason + thread.interrupt(); + } + catch (Exception e) + { + logger.error(MessageCodes.EXCEPTION_ERROR, e, "stopThread", "thread.join"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void updated() + { + // the container artifact has been updated -- adjust the thread name + thread.setName(getThreadName()); + } + + /************************/ + /* 'Runnable' interface */ + /************************/ + + /** + * {@inheritDoc} + */ + @Override + public void run() + { + // set thread local variable + session.setPolicySession(); + + // We want to continue looping, despite any exceptions that occur + // while rules are fired. + KieSession kieSession = session.getKieSession(); + while (repeat) + { + try + { + kieSession.fireUntilHalt(); + + // if we fall through, it means 'KieSession.halt()' was called + repeat = false; + } + catch (Throwable e) + { + logger.error(MessageCodes.EXCEPTION_ERROR, e, "startThread", "kieSession.fireUntilHalt"); + } + } + logger.info("fireUntilHalt() returned"); + } + } } diff --git a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java index da828db3..8792372b 100644 --- a/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java +++ b/policy-core/src/main/java/org/openecomp/policy/drools/core/PolicySessionFeatureAPI.java @@ -79,6 +79,16 @@ public interface PolicySessionFeatureAPI extends OrderedService */ default public void newPolicySession(PolicySession policySession) {} + /** + * This method is called to select the 'ThreadModel' instance associated + * with a 'PolicySession' instance. + */ + default public PolicySession.ThreadModel selectThreadModel + (PolicySession session) + { + return(null); + } + /** * This method is called after 'KieSession.dispose()' is called * -- cgit 1.2.3-korg