diff options
Diffstat (limited to 'policy-core/src')
-rw-r--r-- | policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java | 132 | ||||
-rw-r--r-- | policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java | 993 |
2 files changed, 536 insertions, 589 deletions
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java index 1e8a2882..8a86a5c5 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java @@ -3,6 +3,7 @@ * policy-core * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -237,42 +238,44 @@ public class PolicyContainer implements Startable { synchronized (sessions) { logger.info("activatePolicySession:name :" + name); PolicySession session = sessions.get(name); - if (session == null) { - KieSession kieSession = null; + if (session != null) { + logger.info("activatePolicySession:session - " + session.getFullName() + " is returned."); + return session; + } + KieSession kieSession = null; - // loop through all of the features, and give each one - // a chance to create the 'KieSession' - for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { - try { - if ((kieSession = feature.activatePolicySession(this, name, kieBaseName)) != null) { - break; - } - } catch (Exception e) { - logger.error(ERROR_STRING + feature.getClass().getName(), e); + // loop through all of the features, and give each one + // a chance to create the 'KieSession' + for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { + try { + if ((kieSession = feature.activatePolicySession(this, name, kieBaseName)) != null) { + break; } + } catch (Exception e) { + logger.error(ERROR_STRING + feature.getClass().getName(), e); } + } - // if none of the features created the session, create one now - if (kieSession == null) { - kieSession = kieContainer.newKieSession(name); - } + // if none of the features created the session, create one now + if (kieSession == null) { + kieSession = kieContainer.newKieSession(name); + } + + if (kieSession != null) { + // creation of 'KieSession' was successful - build + // a PolicySession + session = new PolicySession(name, this, kieSession); + sessions.put(name, session); - if (kieSession != null) { - // creation of 'KieSession' was successful - build - // a PolicySession - session = new PolicySession(name, this, kieSession); - sessions.put(name, session); - - // notify features - for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { - try { - feature.newPolicySession(session); - } catch (Exception e) { - logger.error(ERROR_STRING + feature.getClass().getName(), e); - } + // notify features + for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { + try { + feature.newPolicySession(session); + } catch (Exception e) { + logger.error(ERROR_STRING + feature.getClass().getName(), e); } - logger.info("activatePolicySession:new session was added in sessions with name " + name); } + logger.info("activatePolicySession:new session was added in sessions with name " + name); } logger.info("activatePolicySession:session - " + (session == null ? "null" : session.getFullName()) + " is returned."); @@ -497,21 +500,23 @@ public class PolicyContainer implements Startable { */ @Override public synchronized boolean start() { - if (!isStarted) { - // This will create all 'PolicySession' instances specified in the - // 'kmodule.xml' file that don't exist yet - for (String kieBaseName : kieContainer.getKieBaseNames()) { - for (String kieSessionName : kieContainer.getKieSessionNamesInKieBase(kieBaseName)) { - // if the 'PolicySession' does not currently exist, this method - // call will attempt to create it - PolicySession session = activatePolicySession(kieSessionName, kieBaseName); - if (session != null) { - session.startThread(); - } + if (isStarted) { + return true; + } + + // This will create all 'PolicySession' instances specified in the + // 'kmodule.xml' file that don't exist yet + for (String kieBaseName : kieContainer.getKieBaseNames()) { + for (String kieSessionName : kieContainer.getKieSessionNamesInKieBase(kieBaseName)) { + // if the 'PolicySession' does not currently exist, this method + // call will attempt to create it + PolicySession session = activatePolicySession(kieSessionName, kieBaseName); + if (session != null) { + session.startThread(); } } - isStarted = true; } + isStarted = true; return true; } @@ -520,34 +525,37 @@ public class PolicyContainer implements Startable { */ @Override public synchronized boolean stop() { - if (isStarted) { - Collection<PolicySession> localSessions; + if (!isStarted) { + return true; + } - synchronized (sessions) { - // local set containing all of the sessions - localSessions = new HashSet<>(sessions.values()); + Collection<PolicySession> localSessions; - // clear the 'name->session' map in 'PolicyContainer' - sessions.clear(); - } - for (PolicySession session : localSessions) { - // stop session thread - session.stopThread(); + synchronized (sessions) { + // local set containing all of the sessions + localSessions = new HashSet<>(sessions.values()); - // free KieSession resources - session.getKieSession().dispose(); + // clear the 'name->session' map in 'PolicyContainer' + sessions.clear(); + } + for (PolicySession session : localSessions) { + // stop session thread + session.stopThread(); - // notify features - for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { - try { - feature.disposeKieSession(session); - } catch (Exception e) { - logger.error(ERROR_STRING + feature.getClass().getName(), e); - } + // free KieSession resources + session.getKieSession().dispose(); + + // notify features + for (PolicySessionFeatureAPI feature : PolicySessionFeatureAPI.impl.getList()) { + try { + feature.disposeKieSession(session); + } catch (Exception e) { + logger.error(ERROR_STRING + feature.getClass().getName(), e); } } - isStarted = false; } + isStarted = false; + return true; } diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java index 431050f5..7646225e 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicySession.java @@ -3,13 +3,14 @@ * policy-core * ================================================================================ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -43,534 +44,472 @@ import org.slf4j.LoggerFactory; /** * This class is a wrapper around 'KieSession', which adds the following: - * + * <p> * 1) A thread running 'KieSession.fireUntilHalt()' * 2) Access to UEB * 3) Logging of events */ public class PolicySession - implements AgendaEventListener, RuleRuntimeEventListener -{ - // get an instance of logger - private static Logger logger = LoggerFactory.getLogger(PolicySession.class); - // name of the 'PolicySession' and associated 'KieSession' - private String name; - - // the associated 'PolicyContainer', which may have additional - // 'PolicySession' instances in addition to this one - private PolicyContainer container; - - // maps feature objects to per-PolicyContainer data - private ConcurrentHashMap<Object, Object> adjuncts = - new ConcurrentHashMap<>(); - - // associated 'KieSession' instance - private KieSession kieSession; - - // if not 'null', this is the thread model processing the 'KieSession' - private ThreadModel threadModel = null; - - // supports 'getCurrentSession()' method - private static ThreadLocal<PolicySession> policySess = - new ThreadLocal<>(); - - /** - * Internal constructor - create a 'PolicySession' instance - * - * @param name the name of this 'PolicySession' (and 'kieSession') - * @param container the 'PolicyContainer' instance containing this session - * @param kieSession the associated 'KieSession' instance - */ - protected PolicySession(String name, - PolicyContainer container, KieSession kieSession) - { - this.name = name; - this.container = container; - this.kieSession = kieSession; - kieSession.addEventListener((AgendaEventListener)this); - kieSession.addEventListener((RuleRuntimeEventListener)this); - } - - /** - * @return the 'PolicyContainer' object containing this session - */ - public PolicyContainer getPolicyContainer() - { - return container; - } - - /** - * @return the associated 'KieSession' instance - */ - public KieSession getKieSession() - { - return kieSession; - } - - /** - * @return the local name of this session, which should either match the - * name specified in 'kmodule.xml' file associated with this session, or the - * name passed on the 'PolicyContainer.adoptKieSession' method. - */ - public String getName() - { - return name; - } - - /** - * @return the 'PolicyContainer' name, followed by ':', followed by the - * local name of the session. It should be useful in log messages. - */ - public String getFullName() - { - return container.getName() + ":" + name; - } - - /** - * If no 'ThreadModel' is currently running, this method will create one, - * and invoke it's 'start()' method. Features implementing - * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create - * the ThreadModel instance. - */ - public synchronized void startThread() - { - if (threadModel == null) - { - // loop through all of the features, and give each one - // a chance to create the 'ThreadModel' - for (PolicySessionFeatureAPI feature : - PolicySessionFeatureAPI.impl.getList()) - { - try - { - if ((threadModel = feature.selectThreadModel(this)) != null) - break; - } - catch (Exception e) - { - logger.error("ERROR: Feature API: " - + feature.getClass().getName(), e); - } - } - if (threadModel == null) - { - // no feature created a ThreadModel -- select the default - threadModel = new DefaultThreadModel(this); - } - logger.info("starting ThreadModel for session " + getFullName()); - threadModel.start(); - } - } - - /** - * If a 'ThreadModel' is currently running, this calls the 'stop()' method, - * and sets the 'threadModel' reference to 'null'. - */ - public synchronized void stopThread() - { - if (threadModel != null) - { - threadModel.stop(); - threadModel = null; - } - } - - /** - * Notification that 'updateToVersion' was called on the container - */ - void updated() - { - if (threadModel != null) - { - // notify the 'ThreadModel', which may change one or more Thread names - threadModel.updated(); - } - } - - /** - * Set this 'PolicySession' instance as the one associated with the - * currently-running thread. - */ - public void setPolicySession() - { - // this sets a 'ThreadLocal' variable - policySess.set(this); - } - - /** - * @return the 'PolicySession' instance associated with the current thread - * (Note that this only works if the current thread is the one running - * 'kieSession.fireUntilHalt()'.) - */ - public static PolicySession getCurrentSession() - { - return policySess.get(); - } - - /** - * Fetch the adjunct object associated with a given feature - * - * @param object this is typically the singleton feature object that is - * used as a key, but it might also be useful to use nested objects - * within the feature as keys. - * @return a feature-specific object associated with the key, or 'null' - * if it is not found. - */ - public Object getAdjunct(Object object) - { - return adjuncts.get(object); - } - - /** - * Store the adjunct object associated with a given feature - * - * @param object this is typically the singleton feature object that is - * used as a key, but it might also be useful to use nested objects - * within the feature as keys. - * @param value a feature-specific object associated with the key, or 'null' - * if the feature-specific object should be removed - */ - public void setAdjunct(Object object, Object value) - { - if (value == null) - { - adjuncts.remove(object); - } - else - { - adjuncts.put(object, value); - } - } - - /***********************************/ - /* 'AgendaEventListener' interface */ - /***********************************/ - - /** - * {@inheritDoc} - */ - @Override - public void afterMatchFired(AfterMatchFiredEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("afterMatchFired: " + getFullName() - + ": AgendaEventListener.afterMatchFired(" + event + ")"); - } - PdpJmx.getInstance().ruleFired(); - } - - /** - * {@inheritDoc} - */ - @Override - public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("afterRuleFlowGroupActivated: " + getFullName() - + ": AgendaEventListener.afterRuleFlowGroupActivated(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void afterRuleFlowGroupDeactivated - (RuleFlowGroupDeactivatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("afterRuleFlowGroupDeactivated: " + getFullName() - + ": AgendaEventListener.afterRuleFlowGroupDeactivated(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void agendaGroupPopped(AgendaGroupPoppedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("agendaGroupPopped: " + getFullName() - + ": AgendaEventListener.agendaGroupPopped(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void agendaGroupPushed(AgendaGroupPushedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("agendaGroupPushed: " + getFullName() - + ": AgendaEventListener.agendaGroupPushed(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void beforeMatchFired(BeforeMatchFiredEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("beforeMatchFired: " + getFullName() - + ": AgendaEventListener.beforeMatchFired(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void beforeRuleFlowGroupActivated - (RuleFlowGroupActivatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("beforeRuleFlowGroupActivated: " + getFullName() - + ": AgendaEventListener.beforeRuleFlowGroupActivated(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void beforeRuleFlowGroupDeactivated - (RuleFlowGroupDeactivatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName() - + ": AgendaEventListener.beforeRuleFlowGroupDeactivated(" - + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void matchCancelled(MatchCancelledEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("matchCancelled: " + getFullName() - + ": AgendaEventListener.matchCancelled(" + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void matchCreated(MatchCreatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("matchCreated: " + getFullName() - + ": AgendaEventListener.matchCreated(" + event + ")"); - } - } - - /****************************************/ - /* 'RuleRuntimeEventListener' interface */ - /****************************************/ - - /** - * {@inheritDoc} - */ - @Override - public void objectDeleted(ObjectDeletedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("objectDeleted: " + getFullName() - + ": AgendaEventListener.objectDeleted(" + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void objectInserted(ObjectInsertedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("objectInserted: " + getFullName() - + ": AgendaEventListener.objectInserted(" + event + ")"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void objectUpdated(ObjectUpdatedEvent event) - { - if (logger.isDebugEnabled()) - { - logger.debug("objectUpdated: " + getFullName() - + ": AgendaEventListener.objectUpdated(" + event + ")"); - } - } - - /* ============================================================ */ - - /** - * This interface helps support the ability for features to choose the - * thread or threads that processes the 'KieSession'. - */ - public interface ThreadModel - { - /** - * Start the thread or threads that do the 'KieSession' processing - */ - public void start(); - - /** - * Stop the thread or threads that do the 'KieSession' processing - */ - public void stop(); - - /** - * This method is called to notify the running session that - * 'KieContainer.updateToVersion(...)' has been called (meaning the - * full name of this session has changed). - */ - public default void updated() {} - } - - /* ============================================================ */ - - /** - * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'. - */ - public static class DefaultThreadModel implements Runnable,ThreadModel - { - // session associated with this persistent thread - PolicySession session; - - // the session thread - Thread thread; - - // controls whether the thread loops or terminates - volatile boolean repeat = true; - - /** - * Constructor - initialize 'session' and create thread - * - * @param session the 'PolicySession' instance - */ - public DefaultThreadModel(PolicySession session) - { - this.session = session; - thread = new Thread(this,getThreadName()); - } - - /** - * @return the String to use as the thread name - */ - private String getThreadName() - { - return "Session " + session.getFullName(); - } - - /***************************/ - /* 'ThreadModel' interface */ - /***************************/ - - /** - * {@inheritDoc} - */ - @Override - public void start() - { - repeat = true; - thread.start(); - } - - /** - * {@inheritDoc} - */ - @Override - public void stop() - { - repeat = false; - - // this should cause the thread to exit - session.getKieSession().halt(); - try - { - // wait up to 10 seconds for the thread to stop - thread.join(10000); - - // one more interrupt, just in case the 'kieSession.halt()' - // didn't work for some reason - thread.interrupt(); - } - catch (Exception e) - { - logger.error("stopThread in thread.join error", e); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void updated() - { - // the container artifact has been updated -- adjust the thread name - thread.setName(getThreadName()); - } - - /************************/ - /* 'Runnable' interface */ - /************************/ - - /** - * {@inheritDoc} - */ - @Override - public void run() - { - // set thread local variable - session.setPolicySession(); - - // We want to continue looping, despite any exceptions that occur - // while rules are fired. - KieSession kieSession1 = session.getKieSession(); - while (repeat) - { - try - { - kieSession1.fireUntilHalt(); - - // if we fall through, it means 'kieSession1.halt()' was called, - // but this may be a result of 'KieScanner' doing an update - } - catch (Exception | LinkageError e) - { - logger.error("startThread error in kieSession1.fireUntilHalt", e); - } - } - logger.info("fireUntilHalt() returned"); - } - } + implements AgendaEventListener, RuleRuntimeEventListener { + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(PolicySession.class); + // name of the 'PolicySession' and associated 'KieSession' + private String name; + + // the associated 'PolicyContainer', which may have additional + // 'PolicySession' instances in addition to this one + private PolicyContainer container; + + // maps feature objects to per-PolicyContainer data + private ConcurrentHashMap<Object, Object> adjuncts = + new ConcurrentHashMap<>(); + + // associated 'KieSession' instance + private KieSession kieSession; + + // if not 'null', this is the thread model processing the 'KieSession' + private ThreadModel threadModel = null; + + // supports 'getCurrentSession()' method + private static ThreadLocal<PolicySession> policySess = + new ThreadLocal<>(); + + /** + * Internal constructor - create a 'PolicySession' instance + * + * @param name the name of this 'PolicySession' (and 'kieSession') + * @param container the 'PolicyContainer' instance containing this session + * @param kieSession the associated 'KieSession' instance + */ + protected PolicySession(String name, + PolicyContainer container, KieSession kieSession) { + this.name = name; + this.container = container; + this.kieSession = kieSession; + kieSession.addEventListener((AgendaEventListener) this); + kieSession.addEventListener((RuleRuntimeEventListener) this); + } + + /** + * @return the 'PolicyContainer' object containing this session + */ + public PolicyContainer getPolicyContainer() { + return container; + } + + /** + * @return the associated 'KieSession' instance + */ + public KieSession getKieSession() { + return kieSession; + } + + /** + * @return the local name of this session, which should either match the + * name specified in 'kmodule.xml' file associated with this session, or the + * name passed on the 'PolicyContainer.adoptKieSession' method. + */ + public String getName() { + return name; + } + + /** + * @return the 'PolicyContainer' name, followed by ':', followed by the + * local name of the session. It should be useful in log messages. + */ + public String getFullName() { + return container.getName() + ":" + name; + } + + /** + * If no 'ThreadModel' is currently running, this method will create one, + * and invoke it's 'start()' method. Features implementing + * 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create + * the ThreadModel instance. + */ + public synchronized void startThread() { + if (threadModel != null) { + return; + } + + // loop through all of the features, and give each one + // a chance to create the 'ThreadModel' + for (PolicySessionFeatureAPI feature : + PolicySessionFeatureAPI.impl.getList()) { + try { + if ((threadModel = feature.selectThreadModel(this)) != null) + break; + } catch (Exception e) { + logger.error("ERROR: Feature API: " + + feature.getClass().getName(), e); + } + } + if (threadModel == null) { + // no feature created a ThreadModel -- select the default + threadModel = new DefaultThreadModel(this); + } + logger.info("starting ThreadModel for session " + getFullName()); + threadModel.start(); + } + + /** + * If a 'ThreadModel' is currently running, this calls the 'stop()' method, + * and sets the 'threadModel' reference to 'null'. + */ + public synchronized void stopThread() { + if (threadModel != null) { + threadModel.stop(); + threadModel = null; + } + } + + /** + * Notification that 'updateToVersion' was called on the container + */ + void updated() { + if (threadModel != null) { + // notify the 'ThreadModel', which may change one or more Thread names + threadModel.updated(); + } + } + + /** + * Set this 'PolicySession' instance as the one associated with the + * currently-running thread. + */ + public void setPolicySession() { + // this sets a 'ThreadLocal' variable + policySess.set(this); + } + + /** + * @return the 'PolicySession' instance associated with the current thread + * (Note that this only works if the current thread is the one running + * 'kieSession.fireUntilHalt()'.) + */ + public static PolicySession getCurrentSession() { + return policySess.get(); + } + + /** + * Fetch the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @return a feature-specific object associated with the key, or 'null' + * if it is not found. + */ + public Object getAdjunct(Object object) { + return adjuncts.get(object); + } + + /** + * Store the adjunct object associated with a given feature + * + * @param object this is typically the singleton feature object that is + * used as a key, but it might also be useful to use nested objects + * within the feature as keys. + * @param value a feature-specific object associated with the key, or 'null' + * if the feature-specific object should be removed + */ + public void setAdjunct(Object object, Object value) { + if (value == null) { + adjuncts.remove(object); + } else { + adjuncts.put(object, value); + } + } + + /***********************************/ + /* 'AgendaEventListener' interface */ + /***********************************/ + + /** + * {@inheritDoc} + */ + @Override + public void afterMatchFired(AfterMatchFiredEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("afterMatchFired: " + getFullName() + + ": AgendaEventListener.afterMatchFired(" + event + ")"); + } + PdpJmx.getInstance().ruleFired(); + } + + /** + * {@inheritDoc} + */ + @Override + public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("afterRuleFlowGroupActivated: " + getFullName() + + ": AgendaEventListener.afterRuleFlowGroupActivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void afterRuleFlowGroupDeactivated + (RuleFlowGroupDeactivatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("afterRuleFlowGroupDeactivated: " + getFullName() + + ": AgendaEventListener.afterRuleFlowGroupDeactivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void agendaGroupPopped(AgendaGroupPoppedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("agendaGroupPopped: " + getFullName() + + ": AgendaEventListener.agendaGroupPopped(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void agendaGroupPushed(AgendaGroupPushedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("agendaGroupPushed: " + getFullName() + + ": AgendaEventListener.agendaGroupPushed(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeMatchFired(BeforeMatchFiredEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("beforeMatchFired: " + getFullName() + + ": AgendaEventListener.beforeMatchFired(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeRuleFlowGroupActivated + (RuleFlowGroupActivatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("beforeRuleFlowGroupActivated: " + getFullName() + + ": AgendaEventListener.beforeRuleFlowGroupActivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void beforeRuleFlowGroupDeactivated + (RuleFlowGroupDeactivatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("beforeRuleFlowGroupDeactivated: " + getFullName() + + ": AgendaEventListener.beforeRuleFlowGroupDeactivated(" + + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void matchCancelled(MatchCancelledEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("matchCancelled: " + getFullName() + + ": AgendaEventListener.matchCancelled(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void matchCreated(MatchCreatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("matchCreated: " + getFullName() + + ": AgendaEventListener.matchCreated(" + event + ")"); + } + } + + /****************************************/ + /* 'RuleRuntimeEventListener' interface */ + /****************************************/ + + /** + * {@inheritDoc} + */ + @Override + public void objectDeleted(ObjectDeletedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("objectDeleted: " + getFullName() + + ": AgendaEventListener.objectDeleted(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void objectInserted(ObjectInsertedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("objectInserted: " + getFullName() + + ": AgendaEventListener.objectInserted(" + event + ")"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void objectUpdated(ObjectUpdatedEvent event) { + if (logger.isDebugEnabled()) { + logger.debug("objectUpdated: " + getFullName() + + ": AgendaEventListener.objectUpdated(" + event + ")"); + } + } + + /* ============================================================ */ + + /** + * This interface helps support the ability for features to choose the + * thread or threads that processes the 'KieSession'. + */ + public interface ThreadModel { + /** + * Start the thread or threads that do the 'KieSession' processing + */ + public void start(); + + /** + * Stop the thread or threads that do the 'KieSession' processing + */ + public void stop(); + + /** + * This method is called to notify the running session that + * 'KieContainer.updateToVersion(...)' has been called (meaning the + * full name of this session has changed). + */ + public default void updated() { + } + } + + /* ============================================================ */ + + /** + * This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'. + */ + public static class DefaultThreadModel implements Runnable, ThreadModel { + // session associated with this persistent thread + PolicySession session; + + // the session thread + Thread thread; + + // controls whether the thread loops or terminates + volatile boolean repeat = true; + + /** + * Constructor - initialize 'session' and create thread + * + * @param session the 'PolicySession' instance + */ + public DefaultThreadModel(PolicySession session) { + this.session = session; + thread = new Thread(this, getThreadName()); + } + + /** + * @return the String to use as the thread name + */ + private String getThreadName() { + return "Session " + session.getFullName(); + } + + /***************************/ + /* 'ThreadModel' interface */ + /***************************/ + + /** + * {@inheritDoc} + */ + @Override + public void start() { + repeat = true; + thread.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() { + repeat = false; + + // this should cause the thread to exit + session.getKieSession().halt(); + try { + // wait up to 10 seconds for the thread to stop + thread.join(10000); + + // one more interrupt, just in case the 'kieSession.halt()' + // didn't work for some reason + thread.interrupt(); + } catch (Exception e) { + logger.error("stopThread in thread.join error", e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void updated() { + // the container artifact has been updated -- adjust the thread name + thread.setName(getThreadName()); + } + + /************************/ + /* 'Runnable' interface */ + /************************/ + + /** + * {@inheritDoc} + */ + @Override + public void run() { + // set thread local variable + session.setPolicySession(); + + // We want to continue looping, despite any exceptions that occur + // while rules are fired. + KieSession kieSession1 = session.getKieSession(); + while (repeat) { + try { + kieSession1.fireUntilHalt(); + + // if we fall through, it means 'kieSession1.halt()' was called, + // but this may be a result of 'KieScanner' doing an update + } catch (Exception | LinkageError e) { + logger.error("startThread error in kieSession1.fireUntilHalt", e); + } + } + logger.info("fireUntilHalt() returned"); + } + } } |