diff options
Diffstat (limited to 'feature-session-persistence/src/main/java')
7 files changed, 1189 insertions, 1275 deletions
diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java index c3f00359..4700e8ae 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsPersistenceProperties.java @@ -21,17 +21,17 @@ package org.onap.policy.drools.persistence; public class DroolsPersistenceProperties { - /* - * feature-session-persistence.properties parameter key values - */ - public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; - public static final String DB_URL = "javax.persistence.jdbc.url"; - public static final String DB_USER = "javax.persistence.jdbc.user"; - public static final String DB_PWD = "javax.persistence.jdbc.password"; - public static final String DB_SESSIONINFO_TIMEOUT = "persistence.sessioninfo.timeout"; - public static final String JTA_OBJECTSTORE_DIR = "persistence.objectstore.dir"; + /* + * feature-session-persistence.properties parameter key values + */ + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PWD = "javax.persistence.jdbc.password"; + public static final String DB_SESSIONINFO_TIMEOUT = "persistence.sessioninfo.timeout"; + public static final String JTA_OBJECTSTORE_DIR = "persistence.objectstore.dir"; - private DroolsPersistenceProperties() { - super(); - } + private DroolsPersistenceProperties() { + super(); + } } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java index a012f3c2..e8b50817 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSession.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * feature-session-persistence * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,19 @@ import java.util.Date; public interface DroolsSession { - public String getSessionName(); - public void setSessionName(String sessionName); - public long getSessionId(); - public void setSessionId(long sessionId); - public Date getCreatedDate(); - public void setCreatedDate(Date createdDate); - public Date getUpdatedDate(); - public void setUpdatedDate(Date updatedDate); + public String getSessionName(); + + public void setSessionName(String sessionName); + + public long getSessionId(); + + public void setSessionId(long sessionId); + + public Date getCreatedDate(); + + public void setCreatedDate(Date createdDate); + + public Date getUpdatedDate(); + + public void setUpdatedDate(Date updatedDate); } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java index e2f4ac54..3a8885f8 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionConnector.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * feature-session-persistence * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +20,18 @@ package org.onap.policy.drools.persistence; -public interface DroolsSessionConnector -{ - /** - * Gets a session by PDP id and name. - * @param sessName - * @return a session, or {@code null} if it is not found - */ - public DroolsSession get(String sessName); - - /** - * Replaces a session, adding it if it does not exist. - * @param sess session to be replaced - */ - public void replace(DroolsSession sess); +public interface DroolsSessionConnector { + /** + * Gets a session by PDP id and name. + * @param sessName session name + * @return a session, or {@code null} if it is not found + */ + public DroolsSession get(String sessName); + + /** + * Replaces a session, adding it if it does not exist. + * @param sess session to be replaced + */ + public void replace(DroolsSession sess); } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java index b3616c43..117e6410 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/DroolsSessionEntity.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-session-persistence * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,105 +34,101 @@ import javax.persistence.TemporalType; @Entity public class DroolsSessionEntity implements Serializable, DroolsSession { - private static final long serialVersionUID = -5495057038819948709L; - - @Id - @Column(name = "sessionName", nullable = false) - private String sessionName = "-1"; - - @Column(name = "sessionId", nullable = false) - private long sessionId = -1L; - - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "createdDate", nullable = false) - private Date createdDate; - - @Temporal(TemporalType.TIMESTAMP) - @Column(name = "updatedDate", nullable = false) - private Date updatedDate; - - public DroolsSessionEntity() { - - } - - public DroolsSessionEntity(String sessionName, long sessionId) { - this.sessionName = sessionName; - this.sessionId = sessionId; - - } - - @PrePersist - public void prePersist() { - this.createdDate = new Date(); - this.updatedDate = new Date(); - } - - @PreUpdate - public void preUpdate() { - this.updatedDate = new Date(); - } - - @Override - public String getSessionName() { - return sessionName; - } - - @Override - public void setSessionName(String sessionName) { - this.sessionName = sessionName; - } - - @Override - public long getSessionId() { - return sessionId; - } - - @Override - public void setSessionId(long sessionId) { - this.sessionId = sessionId; - } - - @Override - public Date getCreatedDate() { - return createdDate; - } - - @Override - public void setCreatedDate(Date createdDate) { - this.createdDate = createdDate; - } - - @Override - public Date getUpdatedDate() { - return updatedDate; - } - - @Override - public void setUpdatedDate(Date updatedDate) { - this.updatedDate = updatedDate; - } - - @Override - public boolean equals(Object other) { - if (other instanceof DroolsSession) { - DroolsSession p = (DroolsSession) other; - return this.getSessionName().equals(p.getSessionName()); - } else { - return false; - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + getSessionName().hashCode(); - return result; - } - - @Override - public String toString() { - return "{name=" + getSessionName() + ", id=" + getSessionId() + "}"; - } - + private static final long serialVersionUID = -5495057038819948709L; + + @Id + @Column(name = "sessionName", nullable = false) + private String sessionName = "-1"; + + @Column(name = "sessionId", nullable = false) + private long sessionId = -1L; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "createdDate", nullable = false) + private Date createdDate; + + @Temporal(TemporalType.TIMESTAMP) + @Column(name = "updatedDate", nullable = false) + private Date updatedDate; + + public DroolsSessionEntity() {} + + public DroolsSessionEntity(String sessionName, long sessionId) { + this.sessionName = sessionName; + this.sessionId = sessionId; + } + + @PrePersist + public void prePersist() { + this.createdDate = new Date(); + this.updatedDate = new Date(); + } + + @PreUpdate + public void preUpdate() { + this.updatedDate = new Date(); + } + + @Override + public String getSessionName() { + return sessionName; + } + + @Override + public void setSessionName(String sessionName) { + this.sessionName = sessionName; + } + + @Override + public long getSessionId() { + return sessionId; + } + + @Override + public void setSessionId(long sessionId) { + this.sessionId = sessionId; + } + + @Override + public Date getCreatedDate() { + return createdDate; + } + + @Override + public void setCreatedDate(Date createdDate) { + this.createdDate = createdDate; + } + + @Override + public Date getUpdatedDate() { + return updatedDate; + } + + @Override + public void setUpdatedDate(Date updatedDate) { + this.updatedDate = updatedDate; + } + + @Override + public boolean equals(Object other) { + if (other instanceof DroolsSession) { + DroolsSession session = (DroolsSession) other; + return this.getSessionName().equals(session.getSessionName()); + } else { + return false; + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getSessionName().hashCode(); + return result; + } + + @Override + public String toString() { + return "{name=" + getSessionName() + ", id=" + getSessionId() + "}"; + } } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java index 47ed221e..3d276519 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/EntityMgrTrans.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,109 +32,99 @@ import javax.transaction.UserTransaction; import org.onap.policy.common.utils.jpa.EntityMgrCloser; /** - * Wrapper for an <i>EntityManager</i> that creates a JTA transaction that is - * auto-rolled back when closed. + * Wrapper for an <i>EntityManager</i> that creates a JTA transaction that is auto-rolled back when + * closed. */ public class EntityMgrTrans extends EntityMgrCloser { - /** - * Transaction to be rolled back. - */ - private static UserTransaction userTrans = com.arjuna.ats.jta.UserTransaction.userTransaction(); - - /** - * - * @param em - * entity for which a transaction is to be begun - */ - public EntityMgrTrans(EntityManager em) { - super(em); - - try { - userTrans.begin(); - em.joinTransaction(); - - } catch (RuntimeException |NotSupportedException | SystemException e) { - em.close(); - throw new EntityMgrException(e); - } - } - - /** - * Gets the user transaction. For use by junit tests. - * - * @return the user transaction - */ - protected static UserTransaction getUserTrans() { - return userTrans; - } - - /** - * Sets the user transaction. For use by junit tests. - * - * @param userTrans - * the new user transaction - */ - protected static void setUserTrans(UserTransaction userTrans) { - EntityMgrTrans.userTrans = userTrans; - } - - /** - * Commits the transaction. - */ - public void commit() { - try { - userTrans.commit(); - - } catch (SecurityException | IllegalStateException | RollbackException | HeuristicMixedException - | HeuristicRollbackException | SystemException e) { - - throw new EntityMgrException(e); - } - } - - /** - * Rolls back the transaction. - */ - public void rollback() { - try { - userTrans.rollback(); - - } catch (IllegalStateException | SecurityException | SystemException e) { - throw new EntityMgrException(e); - } - } - - @Override - public void close() { - try { - if (userTrans.getStatus() == Status.STATUS_ACTIVE) { - userTrans.rollback(); - } - - } catch (IllegalStateException | SecurityException | SystemException e) { - throw new EntityMgrException(e); - - } finally { - super.close(); - } - } - - /** - * Runtime exceptions generated by this class. Wraps exceptions generated by - * delegated operations, particularly when they are not, themselves, Runtime - * exceptions. - */ - public static class EntityMgrException extends RuntimeException { - private static final long serialVersionUID = 1L; - - /** - * - * @param e - * exception to be wrapped - */ - public EntityMgrException(Exception e) { - super(e); - } - } + /** Transaction to be rolled back. */ + private static UserTransaction userTrans = com.arjuna.ats.jta.UserTransaction.userTransaction(); + + /** + * Constructor. + * + * @param em entity for which a transaction is to be begun */ + public EntityMgrTrans(EntityManager em) { + super(em); + + try { + userTrans.begin(); + em.joinTransaction(); + + } catch (RuntimeException | NotSupportedException | SystemException e) { + em.close(); + throw new EntityMgrException(e); + } + } + + /** + * Gets the user transaction. For use by junit tests. + * + * @return the user transaction + */ + protected static UserTransaction getUserTrans() { + return userTrans; + } + + /** + * Sets the user transaction. For use by junit tests. + * + * @param userTrans the new user transaction + */ + protected static void setUserTrans(UserTransaction userTrans) { + EntityMgrTrans.userTrans = userTrans; + } + + /** Commits the transaction. */ + public void commit() { + try { + userTrans.commit(); + + } catch (SecurityException + | IllegalStateException + | RollbackException + | HeuristicMixedException + | HeuristicRollbackException + | SystemException e) { + + throw new EntityMgrException(e); + } + } + + /** Rolls back the transaction. */ + public void rollback() { + try { + userTrans.rollback(); + + } catch (IllegalStateException | SecurityException | SystemException e) { + throw new EntityMgrException(e); + } + } + + @Override + public void close() { + try { + if (userTrans.getStatus() == Status.STATUS_ACTIVE) { + userTrans.rollback(); + } + + } catch (IllegalStateException | SecurityException | SystemException e) { + throw new EntityMgrException(e); + + } finally { + super.close(); + } + } + + /** + * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations, + * particularly when they are not, themselves, Runtime exceptions. + */ + public static class EntityMgrException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public EntityMgrException(Exception ex) { + super(ex); + } + } } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java index cd76ae8d..381a896e 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/JpaDroolsSessionConnector.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * feature-session-persistence * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,89 +28,84 @@ import org.slf4j.LoggerFactory; public class JpaDroolsSessionConnector implements DroolsSessionConnector { - private static Logger logger = LoggerFactory.getLogger(JpaDroolsSessionConnector.class); + private static Logger logger = LoggerFactory.getLogger(JpaDroolsSessionConnector.class); - private final EntityManagerFactory emf; + private final EntityManagerFactory emf; - public JpaDroolsSessionConnector(EntityManagerFactory emf) { - this.emf = emf; - } + public JpaDroolsSessionConnector(EntityManagerFactory emf) { + this.emf = emf; + } - @Override - public DroolsSession get(String sessName) { + @Override + public DroolsSession get(String sessName) { - EntityManager em = emf.createEntityManager(); - DroolsSessionEntity s = null; + EntityManager em = emf.createEntityManager(); + DroolsSessionEntity entity = null; - try (EntityMgrTrans trans = new EntityMgrTrans(em)) { + try (EntityMgrTrans trans = new EntityMgrTrans(em)) { - s = em.find(DroolsSessionEntity.class, sessName); - if (s != null) { - em.refresh(s); - } + entity = em.find(DroolsSessionEntity.class, sessName); + if (entity != null) { + em.refresh(entity); + } - trans.commit(); - } + trans.commit(); + } - return s; - } + return entity; + } - @Override - public void replace(DroolsSession sess) { - String sessName = sess.getSessionName(); + @Override + public void replace(DroolsSession sess) { + String sessName = sess.getSessionName(); - logger.info("replace: Entering and manually updating session name= {}", sessName); + logger.info("replace: Entering and manually updating session name= {}", sessName); - EntityManager em = emf.createEntityManager(); + EntityManager em = emf.createEntityManager(); - try (EntityMgrTrans trans = new EntityMgrTrans(em)) { + try (EntityMgrTrans trans = new EntityMgrTrans(em)) { - if (!update(em, sess)) { - add(em, sess); - } + if (!update(em, sess)) { + add(em, sess); + } - trans.commit(); - } + trans.commit(); + } - logger.info("replace: Exiting"); - } + logger.info("replace: Exiting"); + } - /** - * Adds a session to the persistent store. - * - * @param em - * entity manager - * @param sess - * session to be added - */ - private void add(EntityManager em, DroolsSession sess) { - logger.info("add: Inserting session id={}", sess.getSessionId()); + /** + * Adds a session to the persistent store. + * + * @param em entity manager + * @param sess session to be added + */ + private void add(EntityManager em, DroolsSession sess) { + logger.info("add: Inserting session id={}", sess.getSessionId()); - DroolsSessionEntity ent = new DroolsSessionEntity(sess.getSessionName(), sess.getSessionId()); + DroolsSessionEntity ent = new DroolsSessionEntity(sess.getSessionName(), sess.getSessionId()); - em.persist(ent); - } + em.persist(ent); + } - /** - * Updates a session, if it exists within the persistent store. - * - * @param em - * entity manager - * @param sess - * session data to be persisted - * @return {@code true} if a record was updated, {@code false} if it was not - * found - */ - private boolean update(EntityManager em, DroolsSession sess) { + /** + * Updates a session, if it exists within the persistent store. + * + * @param em entity manager + * @param sess session data to be persisted + * @return {@code true} if a record was updated, {@code false} if it was not found + */ + private boolean update(EntityManager em, DroolsSession sess) { - DroolsSessionEntity s = em.find(DroolsSessionEntity.class, sess.getSessionName()); - if (s == null) { - return false; - } + DroolsSessionEntity s = em.find(DroolsSessionEntity.class, sess.getSessionName()); + if (s == null) { + return false; + } - logger.info("update: Updating session id to {}", sess.getSessionId()); - s.setSessionId(sess.getSessionId()); + logger.info("update: Updating session id to {}", sess.getSessionId()); + s.setSessionId(sess.getSessionId()); - return true; - } + return true; + } } diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java index 129be8a7..7b17de6c 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,971 +54,898 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * If this feature is supported, there is a single instance of it. It adds - * persistence to Drools sessions. In addition, if an active-standby feature - * exists, then that is used to determine the active and last-active PDP. If it - * does not exist, then the current host name is used as the PDP id. + * If this feature is supported, there is a single instance of it. It adds persistence to Drools + * sessions. In addition, if an active-standby feature exists, then that is used to determine the + * active and last-active PDP. If it does not exist, then the current host name is used as the PDP + * id. * - * The bulk of the code here was once in other classes, such as - * 'PolicyContainer' and 'Main'. It was moved here as part of making this a - * separate optional feature. + * <p>The bulk of the code here was once in other classes, such as 'PolicyContainer' and 'Main'. It + * was moved here as part of making this a separate optional feature. */ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngineFeatureAPI { - private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class); - - /** - * Standard factory used to get various items. - */ - private static Factory stdFactory = new Factory(); - - /** - * Factory used to get various items. - */ - private Factory fact = stdFactory; - - /** - * KieService factory. - */ - private KieServices kieSvcFact; - - /** - * Persistence properties. - */ - private Properties persistProps; - - /** - * Whether or not the SessionInfo records should be cleaned out. - */ - private boolean sessInfoCleaned; - - /** - * SessionInfo timeout, in milli-seconds, as read from - * {@link #persistProps}. - */ - private long sessionInfoTimeoutMs; - - /** - * Object used to serialize cleanup of sessioninfo table. - */ - private Object cleanupLock = new Object(); - - /** - * Sets the factory to be used during junit testing. - * - * @param fact - * factory to be used - */ - protected void setFactory(Factory fact) { - this.fact = fact; - } - - /** - * Lookup the adjunct for this feature that is associated with the specified - * PolicyContainer. If not found, create one. - * - * @param policyContainer - * the container whose adjunct we are looking up, and possibly - * creating - * @return the associated 'ContainerAdjunct' instance, which may be new - */ - private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) { - - Object rval = policyContainer.getAdjunct(this); - - if (rval == null || !(rval instanceof ContainerAdjunct)) { - // adjunct does not exist, or has the wrong type (should never - // happen) - rval = new ContainerAdjunct(policyContainer); - policyContainer.setAdjunct(this, rval); - } - - return (ContainerAdjunct) rval; - } - - /** - * {@inheritDoc} - */ - @Override - public int getSequenceNumber() { - return 1; - } - - /** - * {@inheritDoc} - */ - @Override - public void globalInit(String[] args, String configDir) { - - kieSvcFact = fact.getKieServices(); - - try { - persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties"); - - } catch (IOException e1) { - logger.error("initializePersistence: ", e1); - } - - sessionInfoTimeoutMs = getPersistenceTimeout(); - } - - /** - * Creates a persistent KieSession, loading it from the persistent store, or - * creating one, if it does not exist yet. - */ - @Override - public KieSession activatePolicySession(PolicyContainer policyContainer, String name, String kieBaseName) { - - if (isPersistenceEnabled(policyContainer, name)) { - cleanUpSessionInfo(); - - return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName); - } - - return null; - } - - /** - * {@inheritDoc} - */ - @Override - public PolicySession.ThreadModel selectThreadModel(PolicySession session) { - - PolicyContainer policyContainer = session.getPolicyContainer(); - if (isPersistenceEnabled(policyContainer, session.getName())) { - return new PersistentThreadModel(session, getProperties(policyContainer)); - } - return null; - } - - /** - * {@inheritDoc} - */ - @Override - public void disposeKieSession(PolicySession policySession) { - - ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); - if (contAdj != null) { - contAdj.disposeKieSession(policySession.getName()); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void destroyKieSession(PolicySession policySession) { - - ContainerAdjunct contAdj = (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); - if (contAdj != null) { - contAdj.destroyKieSession(policySession.getName()); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean afterStart(PolicyEngine engine) { - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean beforeStart(PolicyEngine engine) { - synchronized (cleanupLock) { - sessInfoCleaned = false; - } - - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean beforeActivate(PolicyEngine engine) { - synchronized (cleanupLock) { - sessInfoCleaned = false; - } - - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean afterActivate(PolicyEngine engine) { - return false; - } - - /* ============================================================ */ - - /** - * Gets the persistence timeout value for sessioninfo records. - * - * @return the timeout value, in milli-seconds, or {@code -1} if it is - * unspecified or invalid - */ - private long getPersistenceTimeout() { - String timeoutString = null; - - try { - timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT); - - if (timeoutString != null) { - // timeout parameter is specified - return Long.valueOf(timeoutString) * 1000; - } - - } catch (NumberFormatException e) { - logger.error("Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}", - timeoutString, e); - } - - return -1; - } - - /* ============================================================ */ - - /** - * Each instance of this class is a logical extension of a 'PolicyContainer' - * instance. Its reference is stored in the 'adjuncts' table within the - * 'PolicyContainer', and will be garbage-collected with the container. - */ - protected class ContainerAdjunct { - /** - * 'PolicyContainer' instance that this adjunct is extending. - */ - private PolicyContainer policyContainer; - - /** - * Maps a KIE session name to its data source. - */ - private Map<String, DsEmf> name2ds = new HashMap<>(); - - /** - * Constructor - initialize a new 'ContainerAdjunct' - * - * @param policyContainer - * the 'PolicyContainer' instance this adjunct is extending - */ - private ContainerAdjunct(PolicyContainer policyContainer) { - this.policyContainer = policyContainer; - } - - /** - * Create a new persistent KieSession. If there is already a - * corresponding entry in the database, it is used to initialize the - * KieSession. If not, a completely new session is created. - * - * @param name - * the name of the KieSession (which is also the name of the - * associated PolicySession) - * @param kieBaseName - * the name of the 'KieBase' instance containing this session - * @return a new KieSession with persistence enabled - */ - private KieSession newPersistentKieSession(String name, String kieBaseName) { - - configureSysProps(); - - BasicDataSource ds = fact.makeDataSource(getDataSourceProperties()); - DsEmf dsemf = new DsEmf(ds); - - try { - EntityManagerFactory emf = dsemf.emf; - DroolsSessionConnector conn = fact.makeJpaConnector(emf); - - long desiredSessionId = getSessionId(conn, name); - - logger.info("\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId); - - // session does not exist -- attempt to create one - logger.info("getPolicySession:session does not exist -- attempt to create one with name {}", name); - - Environment env = kieSvcFact.newEnvironment(); - - configureKieEnv(env, emf); - - KieSessionConfiguration kConf = kieSvcFact.newKieSessionConfiguration(); - - KieSession kieSession = (desiredSessionId >= 0 - ? loadKieSession(kieBaseName, desiredSessionId, env, kConf) : null); - - if (kieSession == null) { - // loadKieSession() returned null or desiredSessionId < 0 - logger.info("LOADING We cannot load session {}. Going to create a new one", desiredSessionId); - - kieSession = newKieSession(kieBaseName, env); - } - - replaceSession(conn, name, kieSession); - - name2ds.put(name, dsemf); - - return kieSession; - - } catch (RuntimeException e) { - dsemf.close(); - throw e; - } - } - - /** - * Loads an existing KieSession from the persistent store. - * - * @param kieBaseName - * the name of the 'KieBase' instance containing this session - * @param desiredSessionId - * id of the desired KieSession - * @param env - * Kie Environment for the session - * @param kConf - * Kie Configuration for the session - * @return the persistent session, or {@code null} if it could not be - * loaded - */ - private KieSession loadKieSession(String kieBaseName, long desiredSessionId, Environment env, - KieSessionConfiguration kConf) { - try { - KieSession kieSession = kieSvcFact.getStoreServices().loadKieSession(desiredSessionId, - policyContainer.getKieContainer().getKieBase(kieBaseName), kConf, env); - - logger.info("LOADING Loaded session {}", desiredSessionId); - - return kieSession; - - } catch (Exception e) { - logger.error("loadKieSession error: ", e); - return null; - } - } - - /** - * Creates a new, persistent KieSession. - * - * @param kieBaseName - * the name of the 'KieBase' instance containing this session - * @param env - * Kie Environment for the session - * @return a new, persistent session - */ - private KieSession newKieSession(String kieBaseName, Environment env) { - KieSession kieSession = kieSvcFact.getStoreServices() - .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env); - - logger.info("LOADING CREATED {}", kieSession.getIdentifier()); - - return kieSession; - } - - /** - * Closes the data source associated with a session. - * - * @param name - * name of the session being destroyed - */ - private void destroyKieSession(String name) { - closeDataSource(name); - } - - /** - * Closes the data source associated with a session. - * - * @param name - * name of the session being disposed of - */ - private void disposeKieSession(String name) { - closeDataSource(name); - } - - /** - * Closes the data source associated with a session. - * - * @param name - * name of the session whose data source is to be closed - */ - private void closeDataSource(String name) { - DsEmf ds = name2ds.remove(name); - if (ds != null) { - ds.close(); - } - } - - /** - * Configures java system properties for JPA/JTA. - */ - private void configureSysProps() { - System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60"); - System.setProperty("com.arjuna.ats.arjuna.objectstore.objectStoreDir", - persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR)); - System.setProperty("ObjectStoreEnvironmentBean.objectStoreDir", - persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR)); - } - - /** - * Configures a Kie Environment - * - * @param env - * environment to be configured - * @param emf - * entity manager factory - */ - private void configureKieEnv(Environment env, EntityManagerFactory emf) { - env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf); - env.set(EnvironmentName.TRANSACTION, fact.getUserTrans()); - env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg()); - env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr()); - } - - /** - * Gets a session's ID from the persistent store. - * - * @param conn - * persistence connector - * @param sessnm - * name of the session - * @return the session's id, or {@code -1} if the session is not found - */ - private long getSessionId(DroolsSessionConnector conn, String sessnm) { - DroolsSession sess = conn.get(sessnm); - return sess != null ? sess.getSessionId() : -1; - } - - /** - * Replaces a session within the persistent store, if it exists. Adds it - * otherwise. - * - * @param conn - * persistence connector - * @param sessnm - * name of session to be updated - * @param kieSession - * new session information - */ - private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) { - - DroolsSessionEntity sess = new DroolsSessionEntity(); - - sess.setSessionName(sessnm); - sess.setSessionId(kieSession.getIdentifier()); - - conn.replace(sess); - } - } - - /* ============================================================ */ - - /** - * Gets the data source properties. - * - * @return the data source properties - */ - private Properties getDataSourceProperties() { - Properties props = new Properties(); - props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER)); - props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL)); - props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER)); - props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD)); - props.put("maxActive", "3"); - props.put("maxIdle", "1"); - props.put("maxWait", "120000"); - props.put("whenExhaustedAction", "2"); - props.put("testOnBorrow", "false"); - props.put("poolPreparedStatements", "true"); - - return props; - } - - /** - * Removes "old" Drools 'sessioninfo' records, so they aren't used to - * restore data to Drools sessions. This also has the useful side-effect of - * removing abandoned records as well. - */ - private void cleanUpSessionInfo() { - - synchronized (cleanupLock) { - - if (sessInfoCleaned) { - logger.info("Clean up of sessioninfo table: already done"); - return; - } - - if (sessionInfoTimeoutMs < 0) { - logger.info("Clean up of sessioninfo table: no timeout specified"); - return; - } - - // now do the record deletion - try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties()); - Connection connection = ds.getConnection(); - PreparedStatement statement = connection.prepareStatement( - "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) { - - connection.setAutoCommit(true); - - statement.setLong(1, sessionInfoTimeoutMs / 1000); - - int count = statement.executeUpdate(); - logger.info("Cleaning up sessioninfo table -- {} records removed", count); - - } catch (SQLException e) { - logger.error("Clean up of sessioninfo table failed", e); - } - - // TODO: delete DroolsSessionEntity where sessionId not in - // (sessinfo.xxx) - - sessInfoCleaned = true; - } - } - - /** - * Determine whether persistence is enabled for a specific container - * - * @param container - * container to be checked - * @param sessionName - * name of the session to be checked - * @return {@code true} if persistence is enabled for this container, and - * {@code false} if not - */ - private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) { - Properties properties = getProperties(container); - boolean rval = false; - - if (properties != null) { - // fetch the 'type' property - String type = getProperty(properties, sessionName, "type"); - rval = "auto".equals(type) || "native".equals(type); - } - - return rval; - } - - /** - * Determine the controller properties associated with the policy container. - * - * @param container - * container whose properties are to be retrieved - * @return the container's properties, or {@code null} if not found - */ - private Properties getProperties(PolicyContainer container) { - try { - return fact.getPolicyController(container).getProperties(); - } catch (IllegalArgumentException e) { - logger.error("getProperties exception: ", e); - return null; - } - } - - /** - * Fetch the persistence property associated with a session. The name may - * have the form: - * <ul> - * <li>persistence.SESSION-NAME.PROPERTY</li> - * <li>persistence.PROPERTY</li> - * </ul> - * - * @param properties - * properties from which the value is to be retrieved - * @param sessionName - * session name of interest - * @param property - * property name of interest - * @return the property value, or {@code null} if not found - */ - private String getProperty(Properties properties, String sessionName, String property) { - String value = properties.getProperty("persistence." + sessionName + "." + property); - if (value == null) { - value = properties.getProperty("persistence." + property); - } - - return value; - } - - /* ============================================================ */ - - /** - * This 'ThreadModel' variant periodically calls - * 'KieSession.fireAllRules()', because the 'fireUntilHalt' method isn't - * compatible with persistence. - */ - public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel { - - /** - * Session associated with this persistent thread. - */ - private final PolicySession session; - - /** - * The session thread. - */ - private final Thread thread; - - /** - * Used to indicate that processing should stop. - */ - private final CountDownLatch stopped = new CountDownLatch(1); - - /** - * Minimum time, in milli-seconds, that the thread should sleep before - * firing rules again. - */ - long minSleepTime = 100; - - /** - * Maximum time, in milli-seconds, that the thread should sleep before - * firing rules again. This is a "half" time, so that we can multiply it - * by two without overflowing the word size. - */ - long halfMaxSleepTime = 5000L / 2L; - - /** - * Constructor - initialize variables and create thread - * - * @param session - * the 'PolicySession' instance - * @param properties - * may contain additional session properties - */ - public PersistentThreadModel(PolicySession session, Properties properties) { - this.session = session; - this.thread = new Thread(this, getThreadName()); - - if (properties == null) { - return; - } - - // extract 'minSleepTime' and/or 'maxSleepTime' - String name = session.getName(); - - // fetch 'minSleepTime' value, and update if defined - String sleepTimeString = getProperty(properties, name, "minSleepTime"); - if (sleepTimeString != null) { - try { - minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); - } catch (Exception e) { - logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e); - } - } - - // fetch 'maxSleepTime' value, and update if defined - long maxSleepTime = 2 * halfMaxSleepTime; - sleepTimeString = getProperty(properties, name, "maxSleepTime"); - if (sleepTimeString != null) { - try { - maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); - } catch (Exception e) { - logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e); - } - } - - // swap values if needed - if (minSleepTime > maxSleepTime) { - logger.error("minSleepTime(" + minSleepTime + ") is greater than maxSleepTime(" + maxSleepTime - + ") -- swapping"); - long tmp = minSleepTime; - minSleepTime = maxSleepTime; - maxSleepTime = tmp; - } - - halfMaxSleepTime = Math.max(1, maxSleepTime / 2); - } - - /** - * @return the String to use as the thread name - */ - private String getThreadName() { - return "Session " + session.getFullName() + " (persistent)"; - } - - /***************************/ - /* 'ThreadModel' interface */ - /***************************/ - - /** - * {@inheritDoc} - */ - @Override - public void start() { - thread.start(); - } - - /** - * {@inheritDoc} - */ - @Override - public void stop() { - // tell the thread to stop - stopped.countDown(); - - // wait up to 10 seconds for the thread to stop - try { - thread.join(10000); - - } catch (InterruptedException e) { - logger.error("stopThread exception: ", e); - Thread.currentThread().interrupt(); - } - - // verify that it's done - if (thread.isAlive()) { - logger.error("stopThread: still running"); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void updated() { - // the container artifact has been updated -- adjust the thread name - thread.setName(getThreadName()); - } - - /************************/ - /* 'Runnable' interface */ - /************************/ - - /** - * {@inheritDoc} - */ - @Override - public void run() { - logger.info("PersistentThreadModel running"); - - // set thread local variable - session.setPolicySession(); - - KieSession kieSession = session.getKieSession(); - long sleepTime = 2 * halfMaxSleepTime; - - // We want to continue, despite any exceptions that occur - // while rules are fired. - - boolean cont = true; - while(cont) { - - try { - if (kieSession.fireAllRules() > 0) { - // some rules fired -- reduce poll delay - sleepTime = Math.max(minSleepTime, sleepTime / 2); - } else { - // no rules fired -- increase poll delay - sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime); - } - - } catch (Exception | LinkageError e) { - logger.error("Exception during kieSession.fireAllRules", e); - } - - - try { - if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) { - cont = false; - } - - } catch (InterruptedException e) { - logger.error("startThread exception: ", e); - Thread.currentThread().interrupt(); - cont = false; - } - } - - logger.info("PersistentThreadModel completed"); - } - } - - /* ============================================================ */ - - /** - * DataSource-EntityManagerFactory pair. - */ - private class DsEmf { - private BasicDataSource bds; - private EntityManagerFactory emf; - - /** - * Makes an entity manager factory for the given data source. - * - * @param bds - * pooled data source - */ - public DsEmf(BasicDataSource bds) { - try { - Map<String, Object> props = new HashMap<>(); - props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds); - - this.bds = bds; - this.emf = fact.makeEntMgrFact(props); - - } catch (RuntimeException e) { - closeDataSource(); - throw e; - } - } - - /** - * Closes the entity manager factory and the data source. - */ - public void close() { - try { - emf.close(); - - } catch (RuntimeException e) { - closeDataSource(); - throw e; - } - - closeDataSource(); - } - - /** - * Closes the data source only. - */ - private void closeDataSource() { - try { - bds.close(); - - } catch (SQLException e) { - throw new PersistenceFeatureException(e); - } - - } - } - - private static class SingletonRegistry { - private static final TransactionSynchronizationRegistry transreg = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple(); - - private SingletonRegistry() { - super(); - } - } - - /** - * Factory for various items. Methods can be overridden for junit testing. - */ - protected static class Factory { - - /** - * Gets the transaction manager. - * - * @return the transaction manager - */ - public TransactionManager getTransMgr() { - return com.arjuna.ats.jta.TransactionManager.transactionManager(); - } - - /** - * Gets the user transaction. - * - * @return the user transaction - */ - public UserTransaction getUserTrans() { - return com.arjuna.ats.jta.UserTransaction.userTransaction(); - } - - /** - * Gets the transaction synchronization registry. - * - * @return the transaction synchronization registry - */ - public TransactionSynchronizationRegistry getTransSyncReg() { - return SingletonRegistry.transreg; - } - - /** - * Gets the KIE services. - * - * @return the KIE services - */ - public KieServices getKieServices() { - return KieServices.Factory.get(); - } - - /** - * Loads properties from a file. - * - * @param filenm - * name of the file to load - * @return properties, as loaded from the file - * @throws IOException - * if an error occurs reading from the file - */ - public Properties loadProperties(String filenm) throws IOException { - return PropertyUtil.getProperties(filenm); - } - - /** - * Makes a Data Source. - * - * @param dsProps - * data source properties - * @return a new data source - */ - public BasicDataSource makeDataSource(Properties dsProps) { - try { - return BasicDataSourceFactory.createDataSource(dsProps); - - } catch (Exception e) { - throw new PersistenceFeatureException(e); - } - } - - /** - * Makes a new JPA connector for drools sessions. - * - * @param emf - * entity manager factory - * @return a new JPA connector for drools sessions - */ - public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) { - return new JpaDroolsSessionConnector(emf); - } - - /** - * Makes a new entity manager factory. - * - * @param props - * properties with which the factory should be configured - * @return a new entity manager factory - */ - public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) { - return Persistence.createEntityManagerFactory("onapsessionsPU", props); - } - - /** - * Gets the policy controller associated with a given policy container. - * - * @param container - * container whose controller is to be retrieved - * @return the container's controller - */ - public PolicyController getPolicyController(PolicyContainer container) { - return PolicyController.factory.get(container.getGroupId(), container.getArtifactId()); - } - } - - /** - * Runtime exceptions generated by this class. Wraps exceptions generated by - * delegated operations, particularly when they are not, themselves, Runtime - * exceptions. - */ - public static class PersistenceFeatureException extends RuntimeException { - private static final long serialVersionUID = 1L; - - /** - * - * @param e - * exception to be wrapped - */ - public PersistenceFeatureException(Exception e) { - super(e); - } - } + private static final Logger logger = LoggerFactory.getLogger(PersistenceFeature.class); + + /** Standard factory used to get various items. */ + private static Factory stdFactory = new Factory(); + + /** Factory used to get various items. */ + private Factory fact = stdFactory; + + /** KieService factory. */ + private KieServices kieSvcFact; + + /** Persistence properties. */ + private Properties persistProps; + + /** Whether or not the SessionInfo records should be cleaned out. */ + private boolean sessInfoCleaned; + + /** SessionInfo timeout, in milli-seconds, as read from + * {@link #persistProps}. */ + private long sessionInfoTimeoutMs; + + /** Object used to serialize cleanup of sessioninfo table. */ + private Object cleanupLock = new Object(); + + /** + * Sets the factory to be used during junit testing. + * + * @param fact factory to be used + */ + protected void setFactory(Factory fact) { + this.fact = fact; + } + + /** + * Lookup the adjunct for this feature that is associated with the specified PolicyContainer. If + * not found, create one. + * + * @param policyContainer the container whose adjunct we are looking up, and possibly creating + * @return the associated 'ContainerAdjunct' instance, which may be new + */ + private ContainerAdjunct getContainerAdjunct(PolicyContainer policyContainer) { + + Object rval = policyContainer.getAdjunct(this); + + if (rval == null || !(rval instanceof ContainerAdjunct)) { + // adjunct does not exist, or has the wrong type (should never + // happen) + rval = new ContainerAdjunct(policyContainer); + policyContainer.setAdjunct(this, rval); + } + + return (ContainerAdjunct) rval; + } + + /** + * {@inheritDoc} */ + @Override + public int getSequenceNumber() { + return 1; + } + + /** + * {@inheritDoc} */ + @Override + public void globalInit(String[] args, String configDir) { + + kieSvcFact = fact.getKieServices(); + + try { + persistProps = fact.loadProperties(configDir + "/feature-session-persistence.properties"); + + } catch (IOException e1) { + logger.error("initializePersistence: ", e1); + } + + sessionInfoTimeoutMs = getPersistenceTimeout(); + } + + /** + * Creates a persistent KieSession, loading it from the persistent store, or creating one, if it + * does not exist yet. + */ + @Override + public KieSession activatePolicySession( + PolicyContainer policyContainer, String name, String kieBaseName) { + + if (isPersistenceEnabled(policyContainer, name)) { + cleanUpSessionInfo(); + + return getContainerAdjunct(policyContainer).newPersistentKieSession(name, kieBaseName); + } + + return null; + } + + /** + * {@inheritDoc} */ + @Override + public PolicySession.ThreadModel selectThreadModel(PolicySession session) { + + PolicyContainer policyContainer = session.getPolicyContainer(); + if (isPersistenceEnabled(policyContainer, session.getName())) { + return new PersistentThreadModel(session, getProperties(policyContainer)); + } + return null; + } + + /** + * {@inheritDoc} */ + @Override + public void disposeKieSession(PolicySession policySession) { + + ContainerAdjunct contAdj = + (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); + if (contAdj != null) { + contAdj.disposeKieSession(policySession.getName()); + } + } + + /** + * {@inheritDoc} */ + @Override + public void destroyKieSession(PolicySession policySession) { + + ContainerAdjunct contAdj = + (ContainerAdjunct) policySession.getPolicyContainer().getAdjunct(this); + if (contAdj != null) { + contAdj.destroyKieSession(policySession.getName()); + } + } + + /** + * {@inheritDoc} */ + @Override + public boolean afterStart(PolicyEngine engine) { + return false; + } + + /** + * {@inheritDoc} */ + @Override + public boolean beforeStart(PolicyEngine engine) { + synchronized (cleanupLock) { + sessInfoCleaned = false; + } + + return false; + } + + /** + * {@inheritDoc} */ + @Override + public boolean beforeActivate(PolicyEngine engine) { + synchronized (cleanupLock) { + sessInfoCleaned = false; + } + + return false; + } + + /** + * {@inheritDoc} */ + @Override + public boolean afterActivate(PolicyEngine engine) { + return false; + } + + /* ============================================================ */ + + /** + * Gets the persistence timeout value for sessioninfo records. + * + * @return the timeout value, in milli-seconds, or {@code -1} if it is unspecified or invalid + */ + private long getPersistenceTimeout() { + String timeoutString = null; + + try { + timeoutString = persistProps.getProperty(DroolsPersistenceProperties.DB_SESSIONINFO_TIMEOUT); + + if (timeoutString != null) { + // timeout parameter is specified + return Long.valueOf(timeoutString) * 1000; + } + + } catch (NumberFormatException e) { + logger.error( + "Invalid value for Drools persistence property persistence.sessioninfo.timeout: {}", + timeoutString, + e); + } + + return -1; + } + + /* ============================================================ */ + + /** + * Each instance of this class is a logical extension of a 'PolicyContainer' instance. Its + * reference is stored in the 'adjuncts' table within the 'PolicyContainer', and will be + * garbage-collected with the container. + */ + protected class ContainerAdjunct { + /** 'PolicyContainer' instance that this adjunct is extending. */ + private PolicyContainer policyContainer; + + /** Maps a KIE session name to its data source. */ + private Map<String, DsEmf> name2ds = new HashMap<>(); + + /** + * Constructor - initialize a new 'ContainerAdjunct'. + * + * @param policyContainer the 'PolicyContainer' instance this adjunct is extending + */ + private ContainerAdjunct(PolicyContainer policyContainer) { + this.policyContainer = policyContainer; + } + + /** + * Create a new persistent KieSession. If there is already a corresponding entry in the + * database, it is used to initialize the KieSession. If not, a completely new session is + * created. + * + * @param name the name of the KieSession (which is also the name of the associated + * PolicySession) + * @param kieBaseName the name of the 'KieBase' instance containing this session + * @return a new KieSession with persistence enabled + */ + private KieSession newPersistentKieSession(String name, String kieBaseName) { + + configureSysProps(); + + BasicDataSource ds = fact.makeDataSource(getDataSourceProperties()); + DsEmf dsemf = new DsEmf(ds); + + try { + EntityManagerFactory emf = dsemf.emf; + DroolsSessionConnector conn = fact.makeJpaConnector(emf); + + long desiredSessionId = getSessionId(conn, name); + + logger.info( + "\n\nThis controller is primary... coming up with session {} \n\n", desiredSessionId); + + // session does not exist -- attempt to create one + logger.info( + "getPolicySession:session does not exist -- attempt to create one with name {}", name); + + Environment env = kieSvcFact.newEnvironment(); + + configureKieEnv(env, emf); + + KieSessionConfiguration kieConf = kieSvcFact.newKieSessionConfiguration(); + + KieSession kieSession = + (desiredSessionId >= 0 + ? loadKieSession(kieBaseName, desiredSessionId, env, kieConf) + : null); + + if (kieSession == null) { + // loadKieSession() returned null or desiredSessionId < 0 + logger.info( + "LOADING We cannot load session {}. Going to create a new one", desiredSessionId); + + kieSession = newKieSession(kieBaseName, env); + } + + replaceSession(conn, name, kieSession); + + name2ds.put(name, dsemf); + + return kieSession; + + } catch (RuntimeException e) { + dsemf.close(); + throw e; + } + } + + /** + * Loads an existing KieSession from the persistent store. + * + * @param kieBaseName the name of the 'KieBase' instance containing this session + * @param desiredSessionId id of the desired KieSession + * @param env Kie Environment for the session + * @param kConf Kie Configuration for the session + * @return the persistent session, or {@code null} if it could not be loaded + */ + private KieSession loadKieSession( + String kieBaseName, long desiredSessionId, Environment env, KieSessionConfiguration kieConf) { + try { + KieSession kieSession = + kieSvcFact + .getStoreServices() + .loadKieSession( + desiredSessionId, + policyContainer.getKieContainer().getKieBase(kieBaseName), + kieConf, + env); + + logger.info("LOADING Loaded session {}", desiredSessionId); + + return kieSession; + + } catch (Exception e) { + logger.error("loadKieSession error: ", e); + return null; + } + } + + /** + * Creates a new, persistent KieSession. + * + * @param kieBaseName the name of the 'KieBase' instance containing this session + * @param env Kie Environment for the session + * @return a new, persistent session + */ + private KieSession newKieSession(String kieBaseName, Environment env) { + KieSession kieSession = + kieSvcFact + .getStoreServices() + .newKieSession(policyContainer.getKieContainer().getKieBase(kieBaseName), null, env); + + logger.info("LOADING CREATED {}", kieSession.getIdentifier()); + + return kieSession; + } + + /** + * Closes the data source associated with a session. + * + * @param name name of the session being destroyed + */ + private void destroyKieSession(String name) { + closeDataSource(name); + } + + /** + * Closes the data source associated with a session. + * + * @param name name of the session being disposed of + */ + private void disposeKieSession(String name) { + closeDataSource(name); + } + + /** + * Closes the data source associated with a session. + * + * @param name name of the session whose data source is to be closed + */ + private void closeDataSource(String name) { + DsEmf ds = name2ds.remove(name); + if (ds != null) { + ds.close(); + } + } + + /** Configures java system properties for JPA/JTA. */ + private void configureSysProps() { + System.setProperty("com.arjuna.ats.arjuna.coordinator.defaultTimeout", "60"); + System.setProperty( + "com.arjuna.ats.arjuna.objectstore.objectStoreDir", + persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR)); + System.setProperty( + "ObjectStoreEnvironmentBean.objectStoreDir", + persistProps.getProperty(DroolsPersistenceProperties.JTA_OBJECTSTORE_DIR)); + } + + /** + * Configures a Kie Environment. + * + * @param env environment to be configured + * @param emf entity manager factory + */ + private void configureKieEnv(Environment env, EntityManagerFactory emf) { + env.set(EnvironmentName.ENTITY_MANAGER_FACTORY, emf); + env.set(EnvironmentName.TRANSACTION, fact.getUserTrans()); + env.set(EnvironmentName.TRANSACTION_SYNCHRONIZATION_REGISTRY, fact.getTransSyncReg()); + env.set(EnvironmentName.TRANSACTION_MANAGER, fact.getTransMgr()); + } + + /** + * Gets a session's ID from the persistent store. + * + * @param conn persistence connector + * @param sessnm name of the session + * @return the session's id, or {@code -1} if the session is not found + */ + private long getSessionId(DroolsSessionConnector conn, String sessnm) { + DroolsSession sess = conn.get(sessnm); + return sess != null ? sess.getSessionId() : -1; + } + + /** + * Replaces a session within the persistent store, if it exists. Adds it otherwise. + * + * @param conn persistence connector + * @param sessnm name of session to be updated + * @param kieSession new session information + */ + private void replaceSession(DroolsSessionConnector conn, String sessnm, KieSession kieSession) { + + DroolsSessionEntity sess = new DroolsSessionEntity(); + + sess.setSessionName(sessnm); + sess.setSessionId(kieSession.getIdentifier()); + + conn.replace(sess); + } + } + + /* ============================================================ */ + + /** + * Gets the data source properties. + * + * @return the data source properties + */ + private Properties getDataSourceProperties() { + Properties props = new Properties(); + props.put("driverClassName", persistProps.getProperty(DroolsPersistenceProperties.DB_DRIVER)); + props.put("url", persistProps.getProperty(DroolsPersistenceProperties.DB_URL)); + props.put("username", persistProps.getProperty(DroolsPersistenceProperties.DB_USER)); + props.put("password", persistProps.getProperty(DroolsPersistenceProperties.DB_PWD)); + props.put("maxActive", "3"); + props.put("maxIdle", "1"); + props.put("maxWait", "120000"); + props.put("whenExhaustedAction", "2"); + props.put("testOnBorrow", "false"); + props.put("poolPreparedStatements", "true"); + + return props; + } + + /** + * Removes "old" Drools 'sessioninfo' records, so they aren't used to restore data to Drools + * sessions. This also has the useful side-effect of removing abandoned records as well. + */ + private void cleanUpSessionInfo() { + + synchronized (cleanupLock) { + if (sessInfoCleaned) { + logger.info("Clean up of sessioninfo table: already done"); + return; + } + + if (sessionInfoTimeoutMs < 0) { + logger.info("Clean up of sessioninfo table: no timeout specified"); + return; + } + + // now do the record deletion + try (BasicDataSource ds = fact.makeDataSource(getDataSourceProperties()); + Connection connection = ds.getConnection(); + PreparedStatement statement = + connection.prepareStatement( + "DELETE FROM sessioninfo WHERE timestampdiff(second,lastmodificationdate,now()) > ?")) { + + connection.setAutoCommit(true); + + statement.setLong(1, sessionInfoTimeoutMs / 1000); + + int count = statement.executeUpdate(); + logger.info("Cleaning up sessioninfo table -- {} records removed", count); + + } catch (SQLException e) { + logger.error("Clean up of sessioninfo table failed", e); + } + + // TODO: delete DroolsSessionEntity where sessionId not in + // (sessinfo.xxx) + + sessInfoCleaned = true; + } + } + + /** + * Determine whether persistence is enabled for a specific container. + * + * @param container container to be checked + * @param sessionName name of the session to be checked + * @return {@code true} if persistence is enabled for this container, and {@code false} if not + */ + private boolean isPersistenceEnabled(PolicyContainer container, String sessionName) { + Properties properties = getProperties(container); + boolean rval = false; + + if (properties != null) { + // fetch the 'type' property + String type = getProperty(properties, sessionName, "type"); + rval = "auto".equals(type) || "native".equals(type); + } + + return rval; + } + + /** + * Determine the controller properties associated with the policy container. + * + * @param container container whose properties are to be retrieved + * @return the container's properties, or {@code null} if not found + */ + private Properties getProperties(PolicyContainer container) { + try { + return fact.getPolicyController(container).getProperties(); + } catch (IllegalArgumentException e) { + logger.error("getProperties exception: ", e); + return null; + } + } + + /** + * Fetch the persistence property associated with a session. The name may have the form: + * + * <ul> + * <li>persistence.SESSION-NAME.PROPERTY + * <li>persistence.PROPERTY + * </ul> + * + * @param properties properties from which the value is to be retrieved + * @param sessionName session name of interest + * @param property property name of interest + * @return the property value, or {@code null} if not found + */ + private String getProperty(Properties properties, String sessionName, String property) { + String value = properties.getProperty("persistence." + sessionName + "." + property); + if (value == null) { + value = properties.getProperty("persistence." + property); + } + + return value; + } + + /* ============================================================ */ + + /** + * This 'ThreadModel' variant periodically calls 'KieSession.fireAllRules()', because the + * 'fireUntilHalt' method isn't compatible with persistence. + */ + public class PersistentThreadModel implements Runnable, PolicySession.ThreadModel { + + /** Session associated with this persistent thread. */ + private final PolicySession session; + + /** The session thread. */ + private final Thread thread; + + /** Used to indicate that processing should stop. */ + private final CountDownLatch stopped = new CountDownLatch(1); + + /** Minimum time, in milli-seconds, that the thread should sleep before firing rules again. */ + long minSleepTime = 100; + + /** + * Maximum time, in milli-seconds, that the thread should sleep before firing rules again. This + * is a "half" time, so that we can multiply it by two without overflowing the word size. + */ + long halfMaxSleepTime = 5000L / 2L; + + /** + * Constructor - initialize variables and create thread. + * + * @param session the 'PolicySession' instance + * @param properties may contain additional session properties + */ + public PersistentThreadModel(PolicySession session, Properties properties) { + this.session = session; + this.thread = new Thread(this, getThreadName()); + + if (properties == null) { + return; + } + + // extract 'minSleepTime' and/or 'maxSleepTime' + String name = session.getName(); + + // fetch 'minSleepTime' value, and update if defined + String sleepTimeString = getProperty(properties, name, "minSleepTime"); + if (sleepTimeString != null) { + try { + minSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); + } catch (Exception e) { + logger.error(sleepTimeString + ": Illegal value for 'minSleepTime'", e); + } + } + + // fetch 'maxSleepTime' value, and update if defined + long maxSleepTime = 2 * halfMaxSleepTime; + sleepTimeString = getProperty(properties, name, "maxSleepTime"); + if (sleepTimeString != null) { + try { + maxSleepTime = Math.max(1, Integer.valueOf(sleepTimeString)); + } catch (Exception e) { + logger.error(sleepTimeString + ": Illegal value for 'maxSleepTime'", e); + } + } + + // swap values if needed + if (minSleepTime > maxSleepTime) { + logger.error( + "minSleepTime(" + + minSleepTime + + ") is greater than maxSleepTime(" + + maxSleepTime + + ") -- swapping"); + long tmp = minSleepTime; + minSleepTime = maxSleepTime; + maxSleepTime = tmp; + } + + halfMaxSleepTime = Math.max(1, maxSleepTime / 2); + } + + /** + * Get thread name. + * + * @return the String to use as the thread name */ + private String getThreadName() { + return "Session " + session.getFullName() + " (persistent)"; + } + + /** ************************ */ + /* 'ThreadModel' interface */ + /** ************************ */ + + /** + * {@inheritDoc} */ + @Override + public void start() { + thread.start(); + } + + /** + * {@inheritDoc} */ + @Override + public void stop() { + // tell the thread to stop + stopped.countDown(); + + // wait up to 10 seconds for the thread to stop + try { + thread.join(10000); + + } catch (InterruptedException e) { + logger.error("stopThread exception: ", e); + Thread.currentThread().interrupt(); + } + + // verify that it's done + if (thread.isAlive()) { + logger.error("stopThread: still running"); + } + } + + /** + * {@inheritDoc} */ + @Override + public void updated() { + // the container artifact has been updated -- adjust the thread name + thread.setName(getThreadName()); + } + + /** ********************* */ + /* 'Runnable' interface */ + /** ********************* */ + + /** + * {@inheritDoc} */ + @Override + public void run() { + logger.info("PersistentThreadModel running"); + + // set thread local variable + session.setPolicySession(); + + KieSession kieSession = session.getKieSession(); + long sleepTime = 2 * halfMaxSleepTime; + + // We want to continue, despite any exceptions that occur + // while rules are fired. + + boolean cont = true; + while (cont) { + + try { + if (kieSession.fireAllRules() > 0) { + // some rules fired -- reduce poll delay + sleepTime = Math.max(minSleepTime, sleepTime / 2); + } else { + // no rules fired -- increase poll delay + sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime); + } + + } catch (Exception | LinkageError e) { + logger.error("Exception during kieSession.fireAllRules", e); + } + + try { + if (stopped.await(sleepTime, TimeUnit.MILLISECONDS)) { + cont = false; + } + + } catch (InterruptedException e) { + logger.error("startThread exception: ", e); + Thread.currentThread().interrupt(); + cont = false; + } + } + + logger.info("PersistentThreadModel completed"); + } + } + + /* ============================================================ */ + + /** DataSource-EntityManagerFactory pair. */ + private class DsEmf { + private BasicDataSource bds; + private EntityManagerFactory emf; + + /** + * Makes an entity manager factory for the given data source. + * + * @param bds pooled data source + */ + public DsEmf(BasicDataSource bds) { + try { + Map<String, Object> props = new HashMap<>(); + props.put(org.hibernate.cfg.Environment.JPA_JTA_DATASOURCE, bds); + + this.bds = bds; + this.emf = fact.makeEntMgrFact(props); + + } catch (RuntimeException e) { + closeDataSource(); + throw e; + } + } + + /** Closes the entity manager factory and the data source. */ + public void close() { + try { + emf.close(); + + } catch (RuntimeException e) { + closeDataSource(); + throw e; + } + + closeDataSource(); + } + + /** Closes the data source only. */ + private void closeDataSource() { + try { + bds.close(); + + } catch (SQLException e) { + throw new PersistenceFeatureException(e); + } + } + } + + private static class SingletonRegistry { + private static final TransactionSynchronizationRegistry transreg = + new com.arjuna.ats.internal.jta.transaction.arjunacore + .TransactionSynchronizationRegistryImple(); + + private SingletonRegistry() { + super(); + } + } + + /** Factory for various items. Methods can be overridden for junit testing. */ + protected static class Factory { + + /** + * Gets the transaction manager. + * + * @return the transaction manager + */ + public TransactionManager getTransMgr() { + return com.arjuna.ats.jta.TransactionManager.transactionManager(); + } + + /** + * Gets the user transaction. + * + * @return the user transaction + */ + public UserTransaction getUserTrans() { + return com.arjuna.ats.jta.UserTransaction.userTransaction(); + } + + /** + * Gets the transaction synchronization registry. + * + * @return the transaction synchronization registry + */ + public TransactionSynchronizationRegistry getTransSyncReg() { + return SingletonRegistry.transreg; + } + + /** + * Gets the KIE services. + * + * @return the KIE services + */ + public KieServices getKieServices() { + return KieServices.Factory.get(); + } + + /** + * Loads properties from a file. + * + * @param filenm name of the file to load + * @return properties, as loaded from the file + * @throws IOException if an error occurs reading from the file + */ + public Properties loadProperties(String filenm) throws IOException { + return PropertyUtil.getProperties(filenm); + } + + /** + * Makes a Data Source. + * + * @param dsProps data source properties + * @return a new data source + */ + public BasicDataSource makeDataSource(Properties dsProps) { + try { + return BasicDataSourceFactory.createDataSource(dsProps); + + } catch (Exception e) { + throw new PersistenceFeatureException(e); + } + } + + /** + * Makes a new JPA connector for drools sessions. + * + * @param emf entity manager factory + * @return a new JPA connector for drools sessions + */ + public DroolsSessionConnector makeJpaConnector(EntityManagerFactory emf) { + return new JpaDroolsSessionConnector(emf); + } + + /** + * Makes a new entity manager factory. + * + * @param props properties with which the factory should be configured + * @return a new entity manager factory + */ + public EntityManagerFactory makeEntMgrFact(Map<String, Object> props) { + return Persistence.createEntityManagerFactory("onapsessionsPU", props); + } + + /** + * Gets the policy controller associated with a given policy container. + * + * @param container container whose controller is to be retrieved + * @return the container's controller + */ + public PolicyController getPolicyController(PolicyContainer container) { + return PolicyController.factory.get(container.getGroupId(), container.getArtifactId()); + } + } + + /** + * Runtime exceptions generated by this class. Wraps exceptions generated by delegated operations, + * particularly when they are not, themselves, Runtime exceptions. + */ + public static class PersistenceFeatureException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** + * Constructor. + * */ + public PersistenceFeatureException(Exception ex) { + super(ex); + } + } } |