diff options
author | Kevin McKiou <km097d@att.com> | 2017-08-22 16:08:06 -0500 |
---|---|---|
committer | Jorge Hernandez <jh1730@att.com> | 2017-08-22 22:04:57 +0000 |
commit | 36cf73f8313cbd1baac4bc41565bee23690fc152 (patch) | |
tree | a2fc708564a866403163d084eb781bbe9bc5cbf6 /feature-session-persistence/src/main/java/org | |
parent | 9356ffd28172c6cd0ea3ce80c4a49f4d8525bb30 (diff) |
Add feature-session-persistence
This commit adds the feature-session-persistence module
which will persist drools session data to allow stateful
transactions which can persist across node restarts and
failovers. It also picks up recent changes to the master
branch to avoid merge conflicts.
Issue-ID: POLICY-133
Change-Id: Ifdcd8280ea6df07db79562f1b01fa90296a8b878
Signed-off-by: Kevin McKiou <km097d@att.com>
Diffstat (limited to 'feature-session-persistence/src/main/java/org')
8 files changed, 1507 insertions, 0 deletions
diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java new file mode 100644 index 00000000..fabcbc03 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +public class DroolsPersistenceProperties { + /* + * feature-session-persistence.properties parameter key values + */ + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_DATA_SOURCE = "hibernate.dataSource"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PWD = "javax.persistence.jdbc.password"; + public static final String DB_SESSIONINFO_TIMEOUT = + "persistence.sessioninfo.timeout"; +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java new file mode 100644 index 00000000..a012f3c2 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import java.util.Date; + +public interface DroolsSession { + + public String getSessionName(); + public void setSessionName(String sessionName); + public long getSessionId(); + public void setSessionId(long sessionId); + public Date getCreatedDate(); + public void setCreatedDate(Date createdDate); + public Date getUpdatedDate(); + public void setUpdatedDate(Date updatedDate); +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java new file mode 100644 index 00000000..e2f4ac54 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +public interface DroolsSessionConnector +{ + /** + * Gets a session by PDP id and name. + * @param sessName + * @return a session, or {@code null} if it is not found + */ + public DroolsSession get(String sessName); + + /** + * Replaces a session, adding it if it does not exist. + * @param sess session to be replaced + */ + public void replace(DroolsSession sess); + +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java new file mode 100644 index 00000000..71f5ec9a --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import java.io.Serializable; +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.PrePersist; +import javax.persistence.PreUpdate; +import javax.persistence.Temporal; +import javax.persistence.TemporalType; + +@Entity +public class DroolsSessionEntity implements Serializable, DroolsSession { + + private static final long serialVersionUID = -5495057038819948709L; + + @Id + @Column(name="sessionName", nullable=false) + private String sessionName = "-1"; + + @Column(name="sessionId", nullable=false) + private long sessionId = -1L; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name="createdDate", nullable=false) + private Date createdDate; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name="updatedDate", nullable=false) + private Date updatedDate; + + + public DroolsSessionEntity() { + + } + + public DroolsSessionEntity(String sessionName, + long sessionId) { + this.sessionName = sessionName; + this.sessionId = sessionId; + + } + + @PrePersist + public void prePersist() { + this.createdDate = new Date(); + this.updatedDate = new Date(); + } + + @PreUpdate + public void preUpdate() { + this.updatedDate = new Date(); + } + + @Override + public String getSessionName() { + return sessionName; + } + @Override + public void setSessionName(String sessionName) { + this.sessionName = sessionName; + } + @Override + public long getSessionId() { + return sessionId; + } + + @Override + public void setSessionId(long sessionId) { + this.sessionId = sessionId; + } + + public Date getCreatedDate() { + return createdDate; + } + + + public void setCreatedDate(Date createdDate) { + this.createdDate = createdDate; + } + + + public Date getUpdatedDate() { + return updatedDate; + } + + + public void setUpdatedDate(Date updatedDate) { + this.updatedDate = updatedDate; + } + + + @Override + public boolean equals(Object other){ + if(other instanceof DroolsSession){ + DroolsSession p = (DroolsSession) other; + return this.getSessionName().equals(p.getSessionName()); + }else{ + return false; + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getSessionName().hashCode(); + return result; + } + + @Override + public String toString() { + return "{name=" + getSessionName() + + ", id=" + getSessionId() + "}"; + } + + +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrCloser.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrCloser.java new file mode 100644 index 00000000..58292117 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrCloser.java @@ -0,0 +1,49 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import javax.persistence.EntityManager; + +/** + * Wrapper for an <i>EntityManager</i>, providing auto-close functionality. + */ +public class EntityMgrCloser implements AutoCloseable { + + /** + * The wrapper manager. + */ + private final EntityManager em; + + /** + * + * @param em + * manager to be auto-closed + */ + public EntityMgrCloser(EntityManager em) { + this.em = em; + } + + @Override + public void close() { + em.close(); + } + +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java new file mode 100644 index 00000000..79b620d3 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import javax.persistence.EntityManager; +import javax.persistence.EntityTransaction; + +/** + * Wrapper for an <i>EntityManager</i> that creates a transaction that is + * auto-rolled back when closed. + */ +public class EntityMgrTrans extends EntityMgrCloser { + + /** + * Transaction to be rolled back. + */ + private EntityTransaction trans; + + /** + * + * @param em + * entity for which a transaction is to be begun + */ + public EntityMgrTrans(EntityManager em) { + super(em); + + try { + trans = em.getTransaction(); + trans.begin(); + + } catch (RuntimeException e) { + em.close(); + throw e; + } + } + + /** + * Commits the transaction. + */ + public void commit() { + trans.commit(); + } + + /** + * Rolls back the transaction. + */ + public void rollback() { + trans.rollback(); + } + + @Override + public void close() { + try { + if (trans.isActive()) { + trans.rollback(); + } + + } finally { + super.close(); + } + } + +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java new file mode 100644 index 00000000..76c09681 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java @@ -0,0 +1,116 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JpaDroolsSessionConnector implements DroolsSessionConnector { + + private static Logger logger = LoggerFactory.getLogger(JpaDroolsSessionConnector.class); + + private final EntityManagerFactory emf; + + + public JpaDroolsSessionConnector(EntityManagerFactory emf) { + this.emf = emf; + } + + @Override + public DroolsSession get(String sessName) { + + EntityManager em = emf.createEntityManager(); + DroolsSessionEntity s = null; + + try(EntityMgrTrans trans = new EntityMgrTrans(em)) { + + s = em.find(DroolsSessionEntity.class, sessName); + if(s != null) { + em.refresh(s); + } + + trans.commit(); + } + + return s; + } + + @Override + public void replace(DroolsSession sess) { + String sessName = sess.getSessionName(); + + logger.info("replace: Entering and manually updating session name= {}", sessName); + + EntityManager em = emf.createEntityManager(); + + try(EntityMgrTrans trans = new EntityMgrTrans(em)) { + + if( ! update(em, sess)) { + add(em, sess); + } + + trans.commit(); + } + + logger.info("replace: Exiting"); + } + + /** + * Adds a session to the persistent store. + * @param em entity manager + * @param sess session to be added + */ + private void add(EntityManager em, DroolsSession sess) { + logger.info("add: Inserting session id={}", sess.getSessionId()); + + DroolsSessionEntity ent = + new DroolsSessionEntity( + sess.getSessionName(), + sess.getSessionId()); + + em.persist(ent); + } + + /** + * Updates a session, if it exists within the persistent store. + * @param em entity manager + * @param sess session data to be persisted + * @return {@code true} if a record was updated, {@code false} if it + * was not found + */ + private boolean update(EntityManager em, DroolsSession sess) { + + DroolsSessionEntity s = + em.find(DroolsSessionEntity.class, sess.getSessionName()); + if(s == null) { + return false; + } + + logger.info("update: Updating session id to {}", sess.getSessionId()); + s.setSessionId( sess.getSessionId()); + + return true; + } +} diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java new file mode 100644 index 00000000..e6603b68 --- /dev/null +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java @@ -0,0 +1,1014 @@ +/*- + * ============LICENSE_START======================================================= + * feature-session-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.persistence; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; + +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.kie.api.KieServices; +import org.kie.api.runtime.Environment; +import org.kie.api.runtime.EnvironmentName; +import org.kie.api.runtime.KieSession; +import org.kie.api.runtime.KieSessionConfiguration; +import org.onap.policy.drools.core.PolicyContainer; +import org.onap.policy.drools.core.PolicySession; +import org.onap.policy.drools.core.PolicySessionFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngine; +import org.onap.policy.drools.utils.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import bitronix.tm.BitronixTransactionManager; +import bitronix.tm.Configuration; +import bitronix.tm.TransactionManagerServices; +import bitronix.tm.resource.jdbc.PoolingDataSource; + +/** + * If this feature is supported, there is a single instance of it. It adds + * persistence to Drools sessions. In addition, if an active-standby feature + * exists, then that is used to determine the active and last-active PDP. If it + * does not exist, then the current host name is used as the PDP id. + * + * The bulk of the code here was once in other classes, such as + * 'PolicyContainer' and 'Main'. It was moved here as part of making this a + * separate optional feature. + */ +public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI { + + private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class); + + + /** + * Standard factory used to get various items. + */ + private static Factory stdFactory = new Factory(); + + /** + * Factory used to get various items. + */ + private Factory fact = stdFactory; + + /** + * KieService factory. + */ + private KieServices kieSvcFact; + + /** + * Host name. + */ + private String hostName; + + /** + * Persistence properties. + */ + private Properties persistProps; + + /** + * Whether or not the SessionInfo records should be cleaned out. + */ + private boolean sessInfoCleaned; + + /** + * SessionInfo timeout, in milli-seconds, as read from + * {@link #persistProps}. + */ + private long sessionInfoTimeoutMs; + + /** + * Object used to serialize cleanup of sessioninfo table. + */ + private Object cleanupLock = new Object(); + + /** + * Sets the factory to be used during junit testing. + * + * @param fact + * factory to be used + */ + protected void setFactory(Factory fact) { + this.fact = fact; + } + + /** + * Lookup the adjunct for this feature that is associated with the specified + * PolicyContainer. If not found, create one. + * + * @param policyContainer + * the container whose adjunct we are looking up, and possibly + * creating + * @return the associated 'ContainerAdjunct' instance, which may be new + */ + private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) { + + Object rval = policyContainer.getAdjunct(this); + + if (rval == null || !(rval instanceof ContainerAdjunct)) { + // adjunct does not exist, or has the wrong type (should never + // happen) + rval = new ContainerAdjunct(policyContainer); + policyContainer.setAdjunct(this, rval); + } + + return ((ContainerAdjunct) rval); + } + + /** + * {@inheritDoc} + */ + @Override + public int getSequenceNumber() { + return (1); + } + + /** + * {@inheritDoc} + */ + @Override + public void globalInit(String args[], String configDir) { + + kieSvcFact = fact.getKieServices(); + + initHostName(); + + try { + persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties"); + + } catch (IOException e1) { + logger.error("initializePersistence: ", e1); + } + + sessionInfoTimeoutMs = getPersistenceTimeout(); + + Configuration bitronixConfiguration = fact.getTransMgrConfig(); + bitronixConfiguration.setJournal(null); + bitronixConfiguration.setServerId(hostName); + } + + /** + * Creates a persistent KieSession, loading it from the persistent store, or + * creating one, if it does not exist yet. + */ + @Override + public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) { + + if (isPersistenceEnabled(policyContainer, name)) { + cleanUpSessionInfo(); + + return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName); + } + + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public PolicySession.ThreadModel selectThreadModel(PolicySession session) { + PolicyContainer policyContainer = session.getPolicyContainer(); + if (isPersistenceEnabled(policyContainer, session.getName())) { + return (new PersistentThreadModel(session, getProperties(policyContainer))); + } + return (null); + } + + /** + * {@inheritDoc} + */ + @Override + public void disposeKieSession(PolicySession policySession) { + + ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); + if(contAdj != null) { + contAdj.disposeKieSession( policySession.getName()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void destroyKieSession(PolicySession policySession) { + + ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); + if(contAdj != null) { + contAdj.destroyKieSession( policySession.getName()); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean afterStart(PolicyEngine engine) { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean beforeStart(PolicyEngine engine) { + synchronized (cleanupLock) { + sessInfoCleaned = false; + } + + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean beforeActivate(PolicyEngine engine) { + synchronized (cleanupLock) { + sessInfoCleaned = false; + } + + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean afterActivate(PolicyEngine engine) { + return false; + } + + /* ============================================================ */ + + /** + * Gets the persistence timeout value for sessioninfo records. + * + * @return the timeout value, in milli-seconds, or {@code -1} if it is + * unspecified or invalid + */ + private long getPersistenceTimeout() { + String timeoutString = null; + + try { + timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT); + + if (timeoutString != null) { + // timeout parameter is specified + return Long.valueOf(timeoutString) * 1000; + } + + } catch (NumberFormatException e) { + logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}", + timeoutString, e); + } + + return -1; + } + + /** + * Initializes {@link #hostName}. + */ + private void initHostName() { + + try { + hostName = fact.getHostName(); + + } catch (UnknownHostException e) { + throw new RuntimeException("cannot determine local hostname", e); + } + } + + /* ============================================================ */ + + /** + * Each instance of this class is a logical extension of a 'PolicyContainer' + * instance. Its reference is stored in the 'adjuncts' table within the + * 'PolicyContainer', and will be garbage-collected with the container. + */ + protected class ContainerAdjunct { + /** + * 'PolicyContainer' instance that this adjunct is extending. + */ + private PolicyContainer policyContainer; + + /** + * Maps a KIE session name to its data source. + */ + private Map<String,PoolingDataSource> name2ds = new HashMap<>(); + + /** + * Constructor - initialize a new 'ContainerAdjunct' + * + * @param policyContainer + * the 'PolicyContainer' instance this adjunct is extending + */ + private ContainerAdjunct(PolicyContainer policyContainer) { + this.policyContainer = policyContainer; + } + + /** + * Create a new persistent KieSession. If there is already a + * corresponding entry in the database, it is used to initialize the + * KieSession. If not, a completely new session is created. + * + * @param name + * the name of the KieSession (which is also the name of the + * associated PolicySession) + * @param kieBaseName + * the name of the 'KieBase' instance containing this session + * @return a new KieSession with persistence enabled + */ + private KieSession newPersistentKieSession(String name, String kieBaseName) { + + long desiredSessionId; + + DroolsSessionConnector conn = getDroolsSessionConnector("onapPU"); + + desiredSessionId = getSessionId(conn, name); + + logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId); + + // session does not exist -- attempt to create one + logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name); + + System.getProperties().put("java.naming.factory.initial", "bitronix.tm.jndi.BitronixInitialContextFactory"); + + Environment env = kieSvcFact.newEnvironment(); + String dsName = loadDataSource(name); + + configureKieEnv(name, env, dsName); + + KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration(); + + KieSession kieSession = (desiredSessionId >= 0 ? loadKieSession(kieBaseName, desiredSessionId, env, kConf) + : null); + + if (kieSession == null) { + // loadKieSession() returned null or desiredSessionId < 0 + logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId); + + kieSession = newKieSession(kieBaseName, env); + } + + replaceSession(conn, name, kieSession); + + return kieSession; + } + + /** + * Loads a data source into {@link #name2ds}, if one doesn't exist + * yet. + * @param sessName session name + * @return the unique data source name + */ + private String loadDataSource(String sessName) { + PoolingDataSource ds = name2ds.get(sessName); + + if(ds == null) { + Properties props = new Properties(); + addOptProp(props, "URL", persistProps.getProperty(DroolsPersistenceProperties.DB_URL)); + addOptProp(props, "user", persistProps.getProperty(DroolsPersistenceProperties.DB_USER)); + addOptProp(props, "password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD)); + + ds = fact.makePoolingDataSource(); + ds.setUniqueName("jdbc/BitronixJTADataSource/" + sessName); + ds.setClassName(persistProps.getProperty(DroolsPersistenceProperties.DB_DATA_SOURCE)); + ds.setMaxPoolSize(3); + ds.setIsolationLevel("SERIALIZABLE"); + ds.setAllowLocalTransactions(true); + ds.getDriverProperties().putAll(props); + ds.init(); + + name2ds.put(sessName, ds); + } + + return ds.getUniqueName(); + } + + /** + * Configures a Kie Environment + * + * @param name + * session name + * @param env + * environment to be configured + * @param dsName + * data source name + */ + private void configureKieEnv(String name, Environment env, String dsName) { + Properties emfProperties = new Properties(); + emfProperties.setProperty(PersistenceUnitProperties.JTA_DATASOURCE, dsName); + + EntityManagerFactory emfact = fact.makeEntMgrFact("onapsessionsPU", emfProperties); + + env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emfact); + env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr()); + } + + /** + * Loads an existing KieSession from the persistent store. + * + * @param kieBaseName + * the name of the 'KieBase' instance containing this session + * @param desiredSessionId + * id of the desired KieSession + * @param env + * Kie Environment for the session + * @param kConf + * Kie Configuration for the session + * @return the persistent session, or {@code null} if it could not be + * loaded + */ + private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env, + KieSessionConfiguration kConf) { + try { + KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId, + policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env); + + logger.info("LOADING Loaded session {}", desiredSessionId); + + return kieSession; + + } catch (Exception e) { + logger.error("loadKieSession error: ", e); + return null; + } + } + + /** + * Creates a new, persistent KieSession. + * + * @param kieBaseName + * the name of the 'KieBase' instance containing this session + * @param env + * Kie Environment for the session + * @return a new, persistent session + */ + private KieSession newKieSession(String kieBaseName, Environment env) { + KieSession kieSession = kieSvcFact.getStoreServices() + .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env); + + logger.info("LOADING CREATED {}", kieSession.getIdentifier()); + + return kieSession; + } + + /** + * Closes the data source associated with a session. + * @param name name of the session being destroyed + */ + private void destroyKieSession(String name) { + closeDataSource(name); + } + + /** + * Closes the data source associated with a session. + * @param name name of the session being disposed of + */ + private void disposeKieSession(String name) { + closeDataSource(name); + } + + /** + * Closes the data source associated with a session. + * @param name name of the session whose data source is to be closed + */ + private void closeDataSource(String name) { + PoolingDataSource ds = name2ds.remove(name); + if(ds != null) { + ds.close(); + } + } + } + + /* ============================================================ */ + + /** + * Removes "old" Drools 'sessioninfo' records, so they aren't used to + * restore data to Drools sessions. This also has the useful side-effect of + * removing abandoned records as well. + */ + private void cleanUpSessionInfo() { + + synchronized (cleanupLock) { + + if (sessInfoCleaned) { + logger.info("Clean up of sessioninfo table: already done"); + return; + } + + if (sessionInfoTimeoutMs < 0) { + logger.info("Clean up of sessioninfo table: no timeout specified"); + return; + } + + // get DB connection properties + String url = persistProps.getProperty(DroolsPersistenceProperties.DB_URL); + String user = persistProps.getProperty(DroolsPersistenceProperties.DB_USER); + String password = persistProps.getProperty(DroolsPersistenceProperties.DB_PWD); + + if (url == null || user == null || password == null) { + logger.error("Missing DB properties for clean up of sessioninfo table"); + return; + } + + // now do the record deletion + try (Connection connection = fact.makeDbConnection(url, user, password); + PreparedStatement statement = connection.prepareStatement( + "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) { + statement.setLong(1, sessionInfoTimeoutMs/1000); + + int count = statement.executeUpdate(); + logger.info("Cleaning up sessioninfo table -- {} records removed", count); + + } catch (SQLException e) { + logger.error("Clean up of sessioninfo table failed", e); + } + + // TODO: delete DroolsSessionEntity where sessionId not in + // (sessinfo.xxx) + + sessInfoCleaned = true; + } + } + + /** + * Gets a connector for manipulating DroolsSession objects within the + * persistent store. + * + * @param pu + * @return a connector for DroolsSession objects + */ + private DroolsSessionConnector getDroolsSessionConnector(String pu) { + + Properties propMap = new Properties(); + addOptProp(propMap, "javax.persistence.jdbc.driver", + persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER)); + addOptProp(propMap, "javax.persistence.jdbc.url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL)); + addOptProp(propMap, "javax.persistence.jdbc.user", + persistProps.getProperty(DroolsPersistenceProperties.DB_USER)); + addOptProp(propMap, "javax.persistence.jdbc.password", + persistProps.getProperty(DroolsPersistenceProperties.DB_PWD)); + + return fact.makeJpaConnector(pu, propMap); + } + + /** + * Adds an optional property to a set of properties. + * @param propMap map into which the property should be added + * @param name property name + * @param value property value, or {@code null} if it should not + * be added + */ + private void addOptProp(Properties propMap, String name, String value) { + if (value != null) { + propMap.put(name, value); + } + } + + /** + * Gets a session's ID from the persistent store. + * + * @param conn + * persistence connector + * @param sessnm + * name of the session + * @return the session's id, or {@code -1} if the session is not found + */ + private long getSessionId(DroolsSessionConnector conn, String sessnm) { + DroolsSession sess = conn.get(sessnm); + return (sess != null ? sess.getSessionId() : -1); + } + + /** + * Replaces a session within the persistent store, if it exists. Adds + * it otherwise. + * + * @param conn + * persistence connector + * @param sessnm + * name of session to be updated + * @param kieSession + * new session information + */ + private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) { + + DroolsSessionEntity sess = new DroolsSessionEntity(); + + sess.setSessionName(sessnm); + sess.setSessionId(kieSession.getIdentifier()); + + conn.replace(sess); + } + + /** + * Determine whether persistence is enabled for a specific container + * + * @param container + * container to be checked + * @param sessionName + * name of the session to be checked + * @return {@code true} if persistence is enabled for this container, and + * {@code false} if not + */ + private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) { + Properties properties = getProperties(container); + boolean rval = false; + + if (properties != null) { + // fetch the 'type' property + String type = getProperty(properties, sessionName, "type"); + rval = ("auto".equals(type) || "native".equals(type)); + } + + return (rval); + } + + /** + * Determine the controller properties associated with the policy container. + * + * @param container + * container whose properties are to be retrieved + * @return the container's properties, or {@code null} if not found + */ + private Properties getProperties(PolicyContainer container) { + try { + return (fact.getPolicyContainer(container).getProperties()); + } catch (IllegalArgumentException e) { + logger.error("getProperties exception: ", e); + return (null); + } + } + + /** + * Fetch the persistence property associated with a session. The name may + * have the form: + * <ul> + * <li>persistence.SESSION-NAME.PROPERTY</li> + * <li>persistence.PROPERTY</li> + * </ul> + * + * @param properties + * properties from which the value is to be retrieved + * @param sessionName + * session name of interest + * @param property + * property name of interest + * @return the property value, or {@code null} if not found + */ + private String getProperty(Properties properties, String sessionName, String property) { + String value = properties.getProperty("persistence." + sessionName + "." + property); + if (value == null) { + value = properties.getProperty("persistence." + property); + } + + return (value); + } + + /* ============================================================ */ + + /** + * This 'ThreadModel' variant periodically calls + * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't + * compatible with persistence. + */ + public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel { + + /** + * Session associated with this persistent thread. + */ + private final PolicySession session; + + /** + * The session thread. + */ + private final Thread thread; + + /** + * Used to indicate that processing should stop. + */ + private final CountDownLatch stopped = new CountDownLatch(1); + + /** + * Minimum time, in milli-seconds, that the thread should sleep + * before firing rules again. + */ + long minSleepTime = 100; + + /** + * Maximum time, in milli-seconds, that the thread should sleep + * before firing rules again. This is a "half" time, so that + * we can multiply it by two without overflowing the word size. + */ + long halfMaxSleepTime = 5000 / 2; + + /** + * Constructor - initialize variables and create thread + * + * @param session + * the 'PolicySession' instance + * @param properties + * may contain additional session properties + */ + public PersistentThreadModel(PolicySession session, Properties properties) { + this.session = session; + this.thread = new Thread(this, getThreadName()); + + if (properties == null) { + return; + } + + // extract 'minSleepTime' and/or 'maxSleepTime' + String name = session.getName(); + + // fetch 'minSleepTime' value, and update if defined + String sleepTimeString = getProperty(properties, name, "minSleepTime"); + if (sleepTimeString != null) { + try { + minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); + } catch (Exception e) { + logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e); + } + } + + // fetch 'maxSleepTime' value, and update if defined + long maxSleepTime = 2 * halfMaxSleepTime; + sleepTimeString = getProperty(properties, name, "maxSleepTime"); + if (sleepTimeString != null) { + try { + maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); + } catch (Exception e) { + logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e); + } + } + + // swap values if needed + if (minSleepTime > maxSleepTime) { + logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime + + ") -- swapping"); + long tmp = minSleepTime; + minSleepTime = maxSleepTime; + maxSleepTime = tmp; + } + + halfMaxSleepTime = Math.max(1, maxSleepTime/2); + } + + /** + * @return the String to use as the thread name + */ + private String getThreadName() { + return ("Session " + session.getFullName() + " (persistent)"); + } + + /***************************/ + /* 'ThreadModel' interface */ + /***************************/ + + /** + * {@inheritDoc} + */ + @Override + public void start() { + thread.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public void stop() { + // tell the thread to stop + stopped.countDown(); + + // wait up to 10 seconds for the thread to stop + try { + thread.join(10000); + + } catch (InterruptedException e) { + logger.error("stopThread exception: ", e); + } + + // verify that it's done + if(thread.isAlive()) { + logger.error("stopThread: still running"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void updated() { + // the container artifact has been updated -- adjust the thread name + thread.setName(getThreadName()); + } + + /************************/ + /* 'Runnable' interface */ + /************************/ + + /** + * {@inheritDoc} + */ + @Override + public void run() { + logger.info("PersistentThreadModel running"); + + // set thread local variable + session.setPolicySession(); + + KieSession kieSession = session.getKieSession(); + long sleepTime = 2 * halfMaxSleepTime; + + // We want to continue, despite any exceptions that occur + // while rules are fired. + + for(;;) { + + try { + if (kieSession.fireAllRules() > 0) { + // some rules fired -- reduce poll delay + sleepTime = Math.max(minSleepTime, sleepTime/2); + } else { + // no rules fired -- increase poll delay + sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime); + } + + } catch (Throwable e) { + logger.error("startThread exception: ", e); + } + + + try { + if(stopped.await(sleepTime, TimeUnit.MILLISECONDS)) { + break; + } + + } catch (InterruptedException e) { + logger.error("startThread exception: ", e); + break; + } + } + + logger.info("PersistentThreadModel completed"); + } + } + + /* ============================================================ */ + + /** + * Factory for various items. Methods can be overridden for junit testing. + */ + protected static class Factory { + + /** + * Gets the configuration for the transaction manager. + * + * @return the configuration for the transaction manager + */ + public Configuration getTransMgrConfig() { + return TransactionManagerServices.getConfiguration(); + } + + /** + * Gets the transaction manager. + * + * @return the transaction manager + */ + public BitronixTransactionManager getTransMgr() { + return TransactionManagerServices.getTransactionManager(); + } + + /** + * Gets the KIE services. + * + * @return the KIE services + */ + public KieServices getKieServices() { + return KieServices.Factory.get(); + } + + /** + * Gets the current host name. + * + * @return the current host name, associated with the IP address of the + * local machine + * @throws UnknownHostException + */ + public String getHostName() throws UnknownHostException { + return InetAddress.getLocalHost().getHostName(); + } + + /** + * Loads properties from a file. + * + * @param filenm + * name of the file to load + * @return properties, as loaded from the file + * @throws IOException + * if an error occurs reading from the file + */ + public Properties loadProperties(String filenm) throws IOException { + return PropertyUtil.getProperties(filenm); + } + + /** + * Makes a connection to the DB. + * + * @param url + * DB URL + * @param user + * user name + * @param pass + * password + * @return a new DB connection + * @throws SQLException + */ + public Connection makeDbConnection(String url, String user, String pass) throws SQLException { + + return DriverManager.getConnection(url, user, pass); + } + + /** + * Makes a new pooling data source. + * + * @return a new pooling data source + */ + public PoolingDataSource makePoolingDataSource() { + return new PoolingDataSource(); + } + + /** + * Makes a new JPA connector for drools sessions. + * + * @param pu + * PU for the entity manager factory + * @param propMap + * properties with which the factory should be configured + * @return a new JPA connector for drools sessions + */ + public DroolsSessionConnector makeJpaConnector(String pu, Properties propMap) { + + EntityManagerFactory emf = makeEntMgrFact(pu, propMap); + + return new JpaDroolsSessionConnector(emf); + } + + /** + * Makes a new entity manager factory. + * + * @param pu + * PU for the entity manager factory + * @param propMap + * properties with which the factory should be configured + * @return a new entity manager factory + */ + public EntityManagerFactory makeEntMgrFact(String pu, Properties propMap) { + return Persistence.createEntityManagerFactory(pu, propMap); + } + + /** + * Gets the policy controller associated with a given policy container. + * + * @param container + * container whose controller is to be retrieved + * @return the container's controller + */ + public PolicyController getPolicyContainer(PolicyContainer container) { + return PolicyController.factory.get(container.getGroupId(), container.getArtifactId()); + } + } +} |