/*
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
* Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.drools.core;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import org.kie.api.event.rule.AfterMatchFiredEvent;
import org.kie.api.event.rule.AgendaEventListener;
import org.kie.api.event.rule.AgendaGroupPoppedEvent;
import org.kie.api.event.rule.AgendaGroupPushedEvent;
import org.kie.api.event.rule.BeforeMatchFiredEvent;
import org.kie.api.event.rule.MatchCancelledEvent;
import org.kie.api.event.rule.MatchCreatedEvent;
import org.kie.api.event.rule.ObjectDeletedEvent;
import org.kie.api.event.rule.ObjectInsertedEvent;
import org.kie.api.event.rule.ObjectUpdatedEvent;
import org.kie.api.event.rule.RuleFlowGroupActivatedEvent;
import org.kie.api.event.rule.RuleFlowGroupDeactivatedEvent;
import org.kie.api.event.rule.RuleRuntimeEventListener;
import org.kie.api.runtime.KieSession;
import org.onap.policy.drools.core.jmx.PdpJmx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is a wrapper around 'KieSession', which adds the following:
*
*
1) A thread running 'KieSession.fireUntilHalt()'
* 2) Access to topic
* 3) Logging of events
*/
public class PolicySession implements AgendaEventListener, RuleRuntimeEventListener {
// get an instance of logger
private static final Logger logger = LoggerFactory.getLogger(PolicySession.class);
// supports 'getCurrentSession()' method
private static ThreadLocal policySess = new ThreadLocal<>();
// name of the 'PolicySession' and associated 'KieSession'
@Getter
private String name;
// the associated 'PolicyContainer', which may have additional
// 'PolicySession' instances in addition to this one
@Getter
private PolicyContainer container;
// maps feature objects to per-PolicyContainer data
private ConcurrentHashMap adjuncts =
new ConcurrentHashMap<>();
// associated 'KieSession' instance
@Getter
private KieSession kieSession;
// if not 'null', this is the thread model processing the 'KieSession'
private ThreadModel threadModel = null;
/**
* Internal constructor - create a 'PolicySession' instance.
*
* @param name the name of this 'PolicySession' (and 'kieSession')
* @param container the 'PolicyContainer' instance containing this session
* @param kieSession the associated 'KieSession' instance
*/
protected PolicySession(String name,
PolicyContainer container, KieSession kieSession) {
this.name = name;
this.container = container;
this.kieSession = kieSession;
kieSession.addEventListener((AgendaEventListener) this);
kieSession.addEventListener((RuleRuntimeEventListener) this);
}
/**
* Get full name.
*
* @return the 'PolicyContainer' name, followed by ':', followed by the
* local name of the session. It should be useful in log messages.
*/
public String getFullName() {
return container.getName() + ":" + name;
}
/**
* If no 'ThreadModel' is currently running, this method will create one,
* and invoke it's 'start()' method. Features implementing
* 'PolicySessionFeatureAPI.selectThreadModel(...)' get a chance to create
* the ThreadModel instance.
*/
public synchronized void startThread() {
if (threadModel != null) {
return;
}
// loop through all the features, and give each one
// a chance to create the 'ThreadModel'
for (PolicySessionFeatureApi feature :
PolicySessionFeatureApiConstants.getImpl().getList()) {
try {
if ((threadModel = feature.selectThreadModel(this)) != null) {
break;
}
} catch (Exception e) {
logger.error("ERROR: Feature API: {}", feature.getClass().getName(), e);
}
}
if (threadModel == null) {
// no feature created a ThreadModel -- select the default
threadModel = new DefaultThreadModel(this);
}
logger.info("starting ThreadModel for session {}", getFullName());
threadModel.start();
}
/**
* If a 'ThreadModel' is currently running, this calls the 'stop()' method,
* and sets the 'threadModel' reference to 'null'.
*/
public synchronized void stopThread() {
if (threadModel != null) {
threadModel.stop();
threadModel = null;
}
}
/**
* Notification that 'updateToVersion' was called on the container.
*/
void updated() {
if (threadModel != null) {
// notify the 'ThreadModel', which may change one or more Thread names
threadModel.updated();
}
}
/**
* Set this 'PolicySession' instance as the one associated with the
* currently-running thread.
*/
public void setPolicySession() {
// this sets a 'ThreadLocal' variable
policySess.set(this);
}
/**
* Unset this 'PolicySession' instance as the one associated with the
* currently-running thread.
*/
public void removePolicySession() {
if (policySess.get() == this) {
policySess.remove();
}
}
/**
* Get current session.
*
* @return the 'PolicySession' instance associated with the current thread
* (Note that this only works if the current thread is the one running
* 'kieSession.fireUntilHalt()'.)
*/
public static PolicySession getCurrentSession() {
return policySess.get();
}
/**
* Fetch the adjunct object associated with a given feature.
*
* @param object this is typically the singleton feature object that is
* used as a key, but it might also be useful to use nested objects
* within the feature as keys.
* @return a feature-specific object associated with the key, or 'null'
* if it is not found.
*/
public Object getAdjunct(Object object) {
return adjuncts.get(object);
}
/**
* Store the adjunct object associated with a given feature.
*
* @param object this is typically the singleton feature object that is
* used as a key, but it might also be useful to use nested objects
* within the feature as keys.
* @param value a feature-specific object associated with the key, or 'null'
* if the feature-specific object should be removed
*/
public void setAdjunct(Object object, Object value) {
if (value == null) {
adjuncts.remove(object);
} else {
adjuncts.put(object, value);
}
}
/**
* This method will insert an object into the Drools memory associated
* with this 'PolicySession' instance. Features are given the opportunity
* to handle the insert, and a distributed host feature could use this to
* send the object to another host, and insert it in the corresponding
* Drools session.
*
* @param object the object to insert in Drools memory
*/
public void insertDrools(Object object) {
for (PolicySessionFeatureApi feature :
PolicySessionFeatureApiConstants.getImpl().getList()) {
if (feature.insertDrools(this, object)) {
// feature is performing the insert
return;
}
}
// no feature has intervened -- do the insert locally
if (kieSession != null) {
kieSession.insert(object);
}
}
/*=================================*/
/* 'AgendaEventListener' interface */
/*=================================*/
/**
* {@inheritDoc}.
*/
@Override
public void afterMatchFired(AfterMatchFiredEvent event) {
logger.debug("afterMatchFired: {}: AgendaEventListener.afterMatchFired({})", getFullName(), event);
PdpJmx.getInstance().ruleFired();
}
/**
* {@inheritDoc}.
*/
@Override
public void afterRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
logger.debug("afterRuleFlowGroupActivated: {}: AgendaEventListener.afterRuleFlowGroupActivated({})",
getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void afterRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
logger.debug("afterRuleFlowGroupDeactivated: {}: AgendaEventListener.afterRuleFlowGroupDeactivated({})",
getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void agendaGroupPopped(AgendaGroupPoppedEvent event) {
logger.debug("agendaGroupPopped: {}: AgendaEventListener.agendaGroupPopped({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void agendaGroupPushed(AgendaGroupPushedEvent event) {
logger.debug("agendaGroupPushed: {}: AgendaEventListener.agendaGroupPushed({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void beforeMatchFired(BeforeMatchFiredEvent event) {
logger.debug("beforeMatchFired: {}: AgendaEventListener.beforeMatchFired({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void beforeRuleFlowGroupActivated(RuleFlowGroupActivatedEvent event) {
logger.debug("beforeRuleFlowGroupActivated: {}: AgendaEventListener.beforeRuleFlowGroupActivated({})",
getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void beforeRuleFlowGroupDeactivated(RuleFlowGroupDeactivatedEvent event) {
logger.debug("beforeRuleFlowGroupDeactivated: {}: AgendaEventListener.beforeRuleFlowGroupDeactivated({})",
getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void matchCancelled(MatchCancelledEvent event) {
logger.debug("matchCancelled: {}: AgendaEventListener.matchCancelled({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void matchCreated(MatchCreatedEvent event) {
logger.debug("matchCreated: {}: AgendaEventListener.matchCreated({})", getFullName(), event);
}
/* ====================================== */
/* 'RuleRuntimeEventListener' interface */
/* ====================================== */
/**
* {@inheritDoc}.
*/
@Override
public void objectDeleted(ObjectDeletedEvent event) {
logger.debug("objectDeleted: {}: AgendaEventListener.objectDeleted({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void objectInserted(ObjectInsertedEvent event) {
logger.debug("objectInserted: {}: AgendaEventListener.objectInserted({})", getFullName(), event);
}
/**
* {@inheritDoc}.
*/
@Override
public void objectUpdated(ObjectUpdatedEvent event) {
logger.debug("objectUpdated: {}: AgendaEventListener.objectUpdated({})", getFullName(), event);
}
/* ============================================================ */
/**
* This interface helps support the ability for features to choose the
* thread or threads that processes the 'KieSession'.
*/
public interface ThreadModel {
/**
* Start the thread or threads that do the 'KieSession' processing.
*/
void start();
/**
* Stop the thread or threads that do the 'KieSession' processing.
*/
void stop();
/**
* This method is called to notify the running session that
* 'KieContainer.updateToVersion(...)' has been called (meaning the
* full name of this session has changed).
*/
default void updated() {
}
}
/* ============================================================ */
/**
* This 'ThreadModel' variant uses 'KieSession.fireUntilHalt()'.
*/
public static class DefaultThreadModel implements Runnable, ThreadModel {
// session associated with this persistent thread
PolicySession session;
// the session thread
Thread thread;
// controls whether the thread loops or terminates
volatile boolean repeat = true;
/**
* Constructor - initialize 'session' and create thread.
*
* @param session the 'PolicySession' instance
*/
public DefaultThreadModel(PolicySession session) {
this.session = session;
thread = new Thread(this, getThreadName());
}
/**
* Get thread name.
*
* @return the String to use as the thread name
*/
private String getThreadName() {
return "Session " + session.getFullName();
}
/*=========================*/
/* 'ThreadModel' interface */
/*=========================*/
/**
* {@inheritDoc}.
*/
@Override
public void start() {
repeat = true;
thread.start();
}
/**
* {@inheritDoc}.
*/
@Override
public void stop() {
repeat = false;
// this should cause the thread to exit
session.getKieSession().halt();
try {
// wait up to 10 seconds for the thread to stop
thread.join(10000);
// one more interrupt, just in case the 'kieSession.halt()'
// didn't work for some reason
thread.interrupt();
} catch (InterruptedException e) {
logger.error("stopThread in thread.join error", e);
Thread.currentThread().interrupt();
}
}
/**
* {@inheritDoc}.
*/
@Override
public void updated() {
// the container artifact has been updated -- adjust the thread name
thread.setName(getThreadName());
}
/*======================*/
/* 'Runnable' interface */
/*======================*/
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// set thread local variable
session.setPolicySession();
// We want to continue looping, despite any exceptions that occur
// while rules are fired.
var kieSession1 = session.getKieSession();
while (repeat) {
try {
kieSession1.fireUntilHalt();
// if we fall through, it means 'kieSession1.halt()' was called,
// but this may be a result of 'KieScanner' doing an update
} catch (Exception | LinkageError e) {
logger.error("startThread error in kieSession1.fireUntilHalt", e);
}
}
session.removePolicySession();
logger.info("fireUntilHalt() returned");
}
}
}