/*- * ============LICENSE_START======================================================= * policy-core * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.openecomp.policy.drools.core; import java.io.IOException; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import javax.persistence.EntityManagerFactory; import javax.persistence.Persistence; import org.eclipse.persistence.config.PersistenceUnitProperties; import org.kie.api.KieBase; import org.kie.api.KieServices; import org.kie.api.builder.KieScanner; import org.kie.api.builder.Message; import org.kie.api.builder.ReleaseId; import org.kie.api.builder.Results; import org.kie.api.runtime.Environment; import org.kie.api.runtime.EnvironmentName; import org.kie.api.runtime.KieContainer; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.KieSessionConfiguration; import bitronix.tm.Configuration; import bitronix.tm.TransactionManagerServices; import bitronix.tm.resource.jdbc.PoolingDataSource; import org.openecomp.policy.common.ia.IntegrityAudit; import org.openecomp.policy.common.ia.IntegrityAuditProperties; import org.openecomp.policy.drools.properties.Startable; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; import org.openecomp.policy.common.logging.eelf.MessageCodes; import org.openecomp.policy.common.logging.eelf.PolicyLogger; import org.openecomp.policy.common.logging.flexlogger.PropertyUtil; /** * This class is a wrapper around 'KieContainer', which adds the ability * to automatically create and track KieSession instances. */ public class PolicyContainer implements Startable { // get an instance of logger private static Logger logger = FlexLogger.getLogger(PolicyContainer.class); // 'KieServices' singleton static private KieServices kieServices = KieServices.Factory.get(); // set of all 'PolicyContainer' instances static private HashSet containers = new HashSet(); // maps feature objects to per-PolicyContainer data private ConcurrentHashMap adjuncts = new ConcurrentHashMap(); // 'KieContainer' associated with this 'PolicyContainer' private KieContainer kieContainer; // indicates whether the PolicyContainer is 'started' // (started = sessions created, threads running) private volatile boolean isStarted = false; // maps session name into the associated 'PolicySession' instance private HashMap sessions = new HashMap(); // if not null, this is a 'KieScanner' looking for updates private KieScanner scanner = null; // indicates whether the scanner has been started // (it can block for a long time) private boolean scannerStarted = false; // Used to set relative pathing to config files for unit test environment public static boolean isUnitTesting = false; /** * uses 'groupId', 'artifactId' and 'version', and fetches the associated * artifact and remaining dependencies from the Maven repository to create * the 'PolicyContainer' and associated 'KieContainer'. * * An exception occurs if the creation of the 'KieContainer' fails. * * @param groupId the 'groupId' associated with the artifact * @param artifactId the artifact name * @param version a comma-separated list of possible versions */ public PolicyContainer(String groupId, String artifactId, String version) { this(kieServices.newReleaseId(groupId, artifactId, version)); } /** * uses the 'groupId', 'artifactId' and 'version' information in 'ReleaseId', * and fetches the associated artifact and remaining dependencies from the * Maven repository to create the 'PolicyContainer' and associated * 'KieContainer'. * * An exception occurs if the creation of the 'KieContainer' fails. * * @param releaseId indicates the artifact that is to be installed in this * container */ public PolicyContainer(ReleaseId releaseId) { if (releaseId.getVersion().contains(",")) { // this is actually a comma-separated list of release ids releaseId = loadArtifact(releaseId.getGroupId(), releaseId.getArtifactId(), releaseId.getVersion()); } else { kieContainer = kieServices.newKieContainer(releaseId); } synchronized(containers) { if(releaseId != null){ logger.info("Add a new kieContainer in containers: releaseId: " + releaseId.toString()); }else{ logger.warn("input releaseId is null"); } containers.add(this); } startScanner(releaseId); } /** * Load an artifact into a new KieContainer. This method handles the * case where the 'version' is actually a comma-separated list of * versions. * * @param groupId the 'groupId' associated with the artifact * @param artifactId the artifact name * @param version a comma-separated list of possible versions */ private ReleaseId loadArtifact (String groupId, String artifactId, String version) { String[] versions = version.split(","); if (versions.length > 1) { logger.info("Multiple KieContainer versions are specified: " + version); } // indicates a 'newKieContainer' call failed RuntimeException exception = null; // set prior to every 'newKieContainer' invocation // (if we are able to create the container, it will be the last // one that was successful) ReleaseId releaseId = null; for (String ver : versions) { try { // Create a 'ReleaseId' object describing the artifact, and // create a 'KieContainer' based upon it. logger.info("Create new KieContainer start, version = " + ver + " ..."); releaseId = kieServices.newReleaseId(groupId, artifactId, ver); kieContainer = kieServices.newKieContainer(releaseId); // clear any exception, and break out of the loop exception = null; break; } catch (RuntimeException e) { exception = e; } } if (exception != null) { // all of the 'newKieContainer' invocations failed -- throw the // most recent exception throw(exception); } return(releaseId); } /** * @return the name of the container, which is the String equivalent of * the 'ReleaseId'. It has the form: * * (groupId + ":" + artifactId + ":" + version) * * Note that the name changes after a successful call to 'updateToVersion', * although typically only the 'version' part changes. */ public String getName() { return(kieContainer.getReleaseId().toString()); } /** * @return the associated 'KieContainer' instance */ public KieContainer getKieContainer() { return(kieContainer); } /** * @return the 'ClassLoader' associated with the 'KieContainer' instance */ public ClassLoader getClassLoader() { return(kieContainer.getClassLoader()); } /** * @return the Maven GroupId of the top-level artifact wrapped * by the container. */ public String getGroupId() { return(kieContainer.getReleaseId().getGroupId()); } /** * @return the Maven ArtifactId of the top-level artifact wrapped * by the container. */ public String getArtifactId() { return(kieContainer.getReleaseId().getArtifactId()); } /** * @return the version of the top-level artifact wrapped by the * container (this may change as updates occur) */ public String getVersion() { return(kieContainer.getReleaseId().getVersion()); } /** * Fetch the named 'PolicySession'. * * @param name the name of the KieSession (which is also the name of * the associated PolicySession) * @return a PolicySession if found, 'null' if not */ public PolicySession getPolicySession(String name) { return(sessions.get(name)); } /** * Internal method to create a PolicySession, possibly restoring it * from persistent storage. * * @param name of the KieSession and PolicySession * @param kieBaseName name of the associated 'KieBase' instance * @return a new or existing PolicySession, or 'null' if not found */ private PolicySession activatePolicySession(String name, String kieBaseName) { synchronized(sessions) { logger.info("activatePolicySession:name :" + name); PolicySession session = sessions.get(name); if (session == null) { KieSession kieSession = null; // loop through all of the features, and give each one // a chance to create the 'KieSession' for (FeatureAPI feature : FeatureAPI.impl.getList()) { if ((kieSession = feature.activatePolicySession (this, name, kieBaseName)) != null) break; } // if none of the features created the session, create one now if (kieSession == null) { kieSession = kieContainer.newKieSession(name); } if (kieSession != null) { // creation of 'KieSession' was successful - build // a PolicySession session = new PolicySession(name, this, kieSession); sessions.put(name, session); logger.info("activatePolicySession:new session was added in sessions with name " + name); } } logger.info("activatePolicySession:session - " + (session == null ? "null" : session.getFullName()) + " is returned."); return(session); } } /** * This creates a 'PolicySession' instance within this 'PolicyContainer', * and ties it to the specified 'KieSession'. 'name' must not currently * exist within the 'PolicyContainer', and the 'KieBase' object associated * with 'KieSession' must belong to the 'KieContainer'. This method provides * a way for 'KieSession' instances that are created programmatically to fit * into this framework. * * @param name the name for the new 'PolicySession' * @param kieSession a 'KieSession' instance, that will be included in * this infrastructure * @return the new 'PolicySession' * @throws IllegalArgumentException if 'kieSession' does not reside within * this container * @throws IllegalStateException if a 'PolicySession' already exists * with this name */ public PolicySession adoptKieSession(String name, KieSession kieSession) throws IllegalArgumentException, IllegalStateException { if(name == null){ logger.warn("adoptKieSession:input name is null"); }else if(kieSession == null){ logger.warn("adoptKieSession:input kieSession is null"); }else { logger.info("adoptKieSession:name: " + name + " kieSession: " + kieSession); } // fetch KieBase, and verify it belongs to this KieContainer boolean match = false; KieBase kieBase = kieSession.getKieBase(); logger.info("adoptKieSession:kieBase: " + kieBase); for (String kieBaseName : kieContainer.getKieBaseNames()) { logger.info("adoptKieSession:kieBaseName: " + kieBaseName); if (kieBase == kieContainer.getKieBase(kieBaseName)) { match = true; break; } } logger.info("adoptKieSession:match " + match); // if we don't have a match yet, the last chance is to look at the // default KieBase, if it exists if (!match && kieBase != kieContainer.getKieBase()) { throw(new IllegalArgumentException ("KieSession '" + name + "' does not reside within container " + getName())); } synchronized (sessions) { if (sessions.get(name) != null) { throw(new IllegalStateException ("PolicySession '" + name + "' already exists")); } // create the new 'PolicySession', add it to the table, // and return the object to the caller logger.info("adoptKieSession:create a new policySession with name " + name); PolicySession policySession = new PolicySession(name, this, kieSession); sessions.put(name, policySession); return(policySession); } } /** * This call 'KieContainer.updateToVersion()', and returns the associated * response as a String. If successful, the name of this 'PolicyContainer' * changes to match the new version. * * @param newVersion this is the version to update to (the 'groupId' * and 'artifactId' remain the same) * @return the list of messages associated with the update (not sure if * this can be 'null', or how to determine success/failure) */ public String updateToVersion(String newVersion) { ReleaseId releaseId = kieContainer.getReleaseId(); Results results = this.updateToVersion (kieServices.newReleaseId(releaseId.getGroupId(), releaseId.getArtifactId(), newVersion)); List messages = (results == null ? null : results.getMessages()); return(messages == null ? null : messages.toString()); } /** * This calls 'KieContainer.updateToVersion()', and returns the associated * response. If successful, the name of this 'PolicyContainer' changes to * match the new version. * * @param releaseId the new artifact (usually new version) to be installed * @return the 'Results' parameter from 'KieContainer.updateToVersion' */ public Results updateToVersion(ReleaseId releaseId) { if(releaseId == null){ logger.warn("updateToVersion:input releaseId is null"); }else { logger.info("updateToVersion:releaseId " + releaseId.toString()); } return(kieContainer.updateToVersion(releaseId)); } /** * @return all existing 'PolicyContainer' instances */ public static Collection getPolicyContainers() { synchronized(containers) { return(new HashSet(containers)); } } /** * @return all of the 'PolicySession' instances */ public Collection getPolicySessions() { // KLUDGE WARNING: this is a temporary workaround -- if there are // no features, we don't have persistence, and 'activate' is never // called. In this case, make sure the container is started. if (FeatureAPI.impl.getList().size() == 0) { start(); } // return current set of PolicySessions synchronized(sessions) { return(new HashSet(sessions.values())); } } /** * This method will start a 'KieScanner' (if not currently running), * provided that the ReleaseId version is 'LATEST' or 'RELEASE', * or refers to a SNAPSHOT version. * * @param releaseId the release id used to create the container */ public synchronized void startScanner(ReleaseId releaseId) { String version = releaseId.getVersion(); if (scannerStarted == false && scanner == null && version != null && (version.equals("LATEST") || version.equals("RELEASE") || version.endsWith("-SNAPSHOT"))) { // create the scanner, and poll at 60 second intervals try { scannerStarted = true; // start this in a separate thread -- it can block for a long time new Thread("Scanner Starter " + getName()) { public void run() { scanner = kieServices.newKieScanner(kieContainer); scanner.start(60000L); } }.start(); } catch (Exception e) { // sometimes the scanner initialization fails for some reason logger.error(MessageCodes.EXCEPTION_ERROR, e, "main", "startServer"); } } } /** * Insert a fact into a specific named session * * @param name this is the session name * @param object this is the fact to be inserted into the session * @return 'true' if the named session was found, 'false' if not */ public boolean insert(String name, Object object) { // TODO: Should the definition of 'name' be expanded to include an // alternate entry point as well? For example, 'name.entryPoint' (or // something other than '.' if that is a problem). synchronized (sessions) { PolicySession session = sessions.get(name); if (session != null) { session.getKieSession().insert(object); return(true); } } return(false); } /** * Insert a fact into all sessions associated with this container * * @param object this is the fact to be inserted into the sessions * @return 'true' if the fact was inserted into at least one session, * 'false' if not */ public boolean insertAll(Object object) { boolean rval = false; synchronized (sessions) { for (PolicySession session : sessions.values()) { session.getKieSession().insert(object); rval = true; } } return(rval); } /*************************/ /* 'Startable' interface */ /*************************/ /** * {@inheritDoc} */ public synchronized boolean start() { if (!isStarted) { // This will create all 'PolicySession' instances specified in the // 'kmodule.xml' file that don't exist yet for (String kieBaseName : kieContainer.getKieBaseNames()) { for (String kieSessionName : kieContainer.getKieSessionNamesInKieBase(kieBaseName)) { // if the 'PolicySession' does not currently exist, this method // call will attempt to create it PolicySession session = activatePolicySession(kieSessionName, kieBaseName); if (session != null) { session.startThread(); } } } isStarted = true; } return(true); } /** * {@inheritDoc} */ public synchronized boolean stop() { if (isStarted) { Collection localSessions; synchronized (sessions) { // local set containing all of the sessions localSessions = new HashSet(sessions.values()); // clear the 'name->session' map in 'PolicyContainer' sessions.clear(); } for (PolicySession session : localSessions) { // stop session thread session.stopThread(); // free KieSession resources session.getKieSession().dispose(); // notify features for (FeatureAPI feature : FeatureAPI.impl.getList()) { feature.disposeKieSession(session); } } isStarted = false; } return(true); } /** * {@inheritDoc} */ public synchronized void shutdown() { // Note that this method does not call 'destroy' on the 'KieSession' // instances, which would remove any associated information in persistent // storage. Should it do this? stop(); synchronized(containers) { containers.remove(this); } // How do we free the resources associated with the KieContainer? // Is garbage collection sufficient? } /** * {@inheritDoc} */ public boolean isAlive() { return(isStarted); } /*************************/ /** * This method is similar to 'shutdown', but it also frees any persistence * resources as well. */ public synchronized void destroy() { // we need all KieSession instances running in order to free // resources associated with persistence start(); Collection localSessions; synchronized (sessions) { // local set containing all of the sessions localSessions = new HashSet(sessions.values()); // clear the 'name->session' map in 'PolicyContainer' sessions.clear(); } for (PolicySession session : localSessions) { // stop session thread session.stopThread(); // free KieSession resources session.getKieSession().destroy(); // notify features for (FeatureAPI feature : FeatureAPI.impl.getList()) { feature.destroyKieSession(session); } } isStarted = false; synchronized(containers) { containers.remove(this); } // How do we free the resources associated with the KieContainer? // Is garbage collection sufficient? } /** * This method is called when the host goes from the 'standby->active' state. */ static public void activate() { // start all of the 'PolicyContainer' instances for (PolicyContainer container : containers) { try { container.start(); } catch (Exception e) { e.printStackTrace(); logger.error(MessageCodes.EXCEPTION_ERROR, e, "activate", "PolicyContainer.start()"); } } } /** * This method is called when the host goes from the 'active->standby' state. */ static public void deactivate() { // deactivate all of the 'PolicyContainer' instances for (PolicyContainer container : containers) { try { container.stop(); } catch (Exception e) { e.printStackTrace(); logger.error(MessageCodes.EXCEPTION_ERROR, e, "deactivate", "PolicyContainer.stop()"); } } } /** * This method does the following: * * 1) Initializes logging * 2) Starts the DroolsPDP Integrity Monitor * 3) Initilaizes persistence * * It no longer reads in properties files, o creates 'PolicyContainer' * instances. * * @param args standard 'main' arguments, which are currently ignored */ public static void globalInit(String args[]) { /* * When JUnit testing, working directory should be * "../policy-management". In test environment, command line argument * should specify the relative path from this directory to the config * directory ("src/test/server/config") */ String configDir = "config"; if (isUnitTesting) { configDir = "src/test/server/config"; } System.out.println("PolicyContainer.main: configDir=" + configDir); logger.info("Calling initlogger"); initlogger(configDir); logger.info("initlogger returned"); // invoke 'globalInit' on all of the features for (FeatureAPI feature : FeatureAPI.impl.getList()) { feature.globalInit(args, configDir); } } /** * Read in the logger properties */ private static void initlogger(String configDir){ try { Properties properties = PropertyUtil.getProperties(configDir + "/policyLogger.properties"); try { PolicyLogger.init(properties); } catch (Exception e) { logger.error(MessageCodes.MISS_PROPERTY_ERROR, e, "initlogger"); } } catch (IOException e1) { logger.error(MessageCodes.MISS_PROPERTY_ERROR, e1, "initlogger"); } } /** * Fetch the adjunct object associated with a given feature * * @param object this is typically the singleton feature object that is * used as a key, but it might also be useful to use nested objects * within the feature as keys. * @return a feature-specific object associated with the key, or 'null' * if it is not found. */ public Object getAdjunct(Object object) { return(adjuncts.get(object)); } /** * Store the adjunct object associated with a given feature * * @param object this is typically the singleton feature object that is * used as a key, but it might also be useful to use nested objects * within the feature as keys. * @param value a feature-specific object associated with the key, or 'null' * if the feature-specific object should be removed */ public void setAdjunct(Object object, Object value) { if (value == null) { adjuncts.remove(object); } else { adjuncts.put(object, value); } } }