aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java1026
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java45
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java259
-rw-r--r--feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java7
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java14
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java488
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java42
-rw-r--r--policy-management/pom.xml7
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java127
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java137
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java4
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java701
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java264
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java20
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java2
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java10
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java5
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java2
-rw-r--r--policy-management/src/test/resources/echo.drl5
-rw-r--r--policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java133
21 files changed, 1698 insertions, 1739 deletions
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
index 33c8a8d8..85cf88b3 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
@@ -202,255 +202,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
logger.debug("DesignatedWaiter.run: pdps.size= {}", pdps.size());
//This is only true if all designated PDPs have failed
- boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps);
- logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed);
- for (DroolsPdp pdp : pdps) {
- logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId());
-
- /*
- * Note: side effect of isPdpCurrent is that any stale but
- * designated PDPs will be marked as un-designated.
- */
- boolean isCurrent = pdpsConnector.isPdpCurrent(pdp);
-
- /*
- * We can't use stateManagement.getStandbyStatus() here, because
- * we need the standbyStatus, not for this PDP, but for the PDP
- * being processed by this loop iteration.
- */
- String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- if (standbyStatus == null) {
- // Treat this case as a cold standby -- if we
- // abort here, no sessions will be created in a
- // single-node test environment.
- standbyStatus = StateManagement.COLD_STANDBY;
- }
- logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent);
-
- /*
- * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn
- * and evaluate the each pdp in the list of pdps against each combination.
- *
- * This is the first combination of isDesignated and isCurrent
- */
- if (pdp.isDesignated() && isCurrent) {
- //It is current, but it could have a standbystatus=coldstandby / hotstandby
- //If so, we need to stand it down and demote it
- if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
- + "butstandbystatus is not providingservice. "
- + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId());
- // So, we must demote it
- try {
- //Keep the order like this. StateManagement is last since it
- //triggers controller shutdown
- //This will change isDesignated and it can enter another if(combination) below
- pdpsConnector.standDownPdp(pdp.getPdpId());
- myPdp.setDesignated(false);
- isDesignated = false;
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only demote it if it appears it has not already been demoted. Don't worry
- * about synching with the topic endpoint states. That is done by the
- * refreshStateAudit
- */
- stateManagementFeature.demote();
- }
- //update the standbystatus to check in a later
- //combination of isDesignated and isCurrent
- standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} "
- + "Caught Exception attempting to demote myPdp,"
- + "message= {}", myPdp.getPdpId(), e);
- }
- } else {
- // Don't demote a remote PDP that is current. It should catch itself
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
- + "but standbystatus is not providingservice. "
- + " Cannot execute stateManagement.demote() "
- + "since it it is not myPdp\n\n",
- myPdp.getPdpId());
- }
-
- } else {
- // If we get here, it is ok to be on the list
- logger.debug("DesignatedWaiter.run: PDP= {} is designated, "
- + "current and {} Noting PDP as "
- + "designated, standbyStatus= {}",
- pdp.getPdpId(), standbyStatus, standbyStatus);
- listOfDesignated.add(pdp);
- }
-
-
- }
-
-
- /*
- * The second combination of isDesignated and isCurrent
- *
- * PDP is designated but not current; it has failed.
- * So we stand it down (it doesn't matter what
- * its standbyStatus is). None of these go on the list.
- */
- if (pdp.isDesignated() && !isCurrent) {
- logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently "
- + "designated but is not current; "
- + "it has failed. Standing down. standbyStatus= {}",
- pdp.getPdpId(), standbyStatus);
- /*
- * Changes designated to 0 but it is still potentially providing service
- * Will affect isDesignated, so, it can enter an if(combination) below
- */
- pdpsConnector.standDownPdp(pdp.getPdpId());
-
- //need to change standbystatus to coldstandby
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. "
- + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId());
- // We found that myPdp is designated but not current
- // So, we must cause it to disableFail
- try {
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- stateManagementFeature.disableFailed();
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
- + "attempting to disableFail myPdp {}, message= {}",
- myPdp.getPdpId(), myPdp.getPdpId(), e);
- }
- } else { //it is a remote PDP that is failed
- logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. "
- + " Executing stateManagement.disableFailed(otherResourceName)\n\n",
- pdp.getPdpId() );
- // We found a PDP is designated but not current
- // We already called standdown(pdp) which will change designated to false
- // Now we need to disableFail it to get its states in synch. The standbyStatus
- // should equal coldstandby
- try {
- stateManagementFeature.disableFailed(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to "
- + "disableFail({}), message= {}",
- pdp.getPdpId(), pdp.getPdpId(), e);
- }
-
- }
- continue; //we are not going to do anything else with this pdp
- }
-
- /*
- * The third combination of isDesignated and isCurrent
- * /*
- * If a PDP is not currently designated but is providing service
- * (erroneous, but recoverable) or hot standby
- * we can add it to the list of possible designated if all the designated have failed
- */
- if (!pdp.isDesignated() && isCurrent) {
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- logger.debug("\n\nDesignatedWaiter.run: PDP {}"
- + " is NOT designated but IS current and"
- + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus);
- // Since it is current, we assume it can adjust its own state.
- // We will demote if it is myPdp
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- //demote it
- logger.debug("DesignatedWaiter.run: PDP {} going to "
- + "setDesignated = false and calling stateManagement.demote",
- pdp.getPdpId());
- try {
- //Keep the order like this.
- //StateManagement is last since it triggers controller shutdown
- pdpsConnector.setDesignated(myPdp, false);
- myPdp.setDesignated(false);
- isDesignated = false;
- //This is definitely not a redundant call.
- //It is attempting to correct a problem
- stateManagementFeature.demote();
- //recheck the standbystatus
- standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
- + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(),
- myPdp.getPdpId(), e);
- }
-
- }
- }
- if (standbyStatus.equals(StateManagement.HOT_STANDBY) && designatedPdpHasFailed) {
- //add it to the list
- logger.debug("INFO: DesignatedWaiter.run: PDP= {}"
- + " is not designated but is {} and designated PDP "
- + "has failed. standbyStatus= {}", pdp.getPdpId(),
- standbyStatus, standbyStatus);
- listOfDesignated.add(pdp);
- }
- continue; //done with this one
- }
-
- /*
- * The fourth combination of isDesignated and isCurrent
- *
- * We are not going to put any of these on the list since it appears they have failed.
-
- *
- */
- if (!pdp.isDesignated() && !isCurrent) {
- logger.debug("INFO: DesignatedWaiter.run: PDP= {} "
- + "designated= {}, current= {}, "
- + "designatedPdpHasFailed= {}, "
- + "standbyStatus= {}",pdp.getPdpId(),
- pdp.isDesignated(), isCurrent, designatedPdpHasFailed, standbyStatus);
- if (!standbyStatus.equals(StateManagement.COLD_STANDBY)) {
- //stand it down
- //disableFail it
- pdpsConnector.standDownPdp(pdp.getPdpId());
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- /*
- * I don't actually know how this condition could
- * happen, but if it did, we would want to declare it
- * failed.
- */
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
- + " Executing stateManagement.disableFailed()\n\n",
- myPdp.getPdpId());
- // So, we must disableFail it
- try {
- //Keep the order like this.
- //StateManagement is last since it triggers controller shutdown
- pdpsConnector.setDesignated(myPdp, false);
- myPdp.setDesignated(false);
- isDesignated = false;
- stateManagementFeature.disableFailed();
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
- + "disableFail myPdp {}, message= {}",
- myPdp.getPdpId(), myPdp.getPdpId(), e);
- }
- } else { //it is remote
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
- + " Executing stateManagement.disableFailed({})\n\n",
- myPdp.getPdpId(), pdp.getPdpId());
- // We already called standdown(pdp) which will change designated to false
- // Now we need to disableFail it to get its states in sync.
- // StandbyStatus = coldstandby
- try {
- stateManagementFeature.disableFailed(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: for PDP {}"
- + " Caught Exception attempting to disableFail({})"
- + ", message=", pdp.getPdpId(), pdp.getPdpId(), e);
- }
- }
- }
- }
-
-
- } // end pdps loop
+ allPdpsFailed(pdps, listOfDesignated);
/*
* We have checked the four combinations of isDesignated and isCurrent. Where appropriate,
@@ -479,7 +231,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
/*
- * It is possible to get here with more than one pdp designated and providingservice. This normally
+ * It is possible to get here with more than one pdp designated and providing service. This normally
* occurs when there is a race condition with multiple nodes coming up at the same time. If that is
* the case we must determine which one is the one that should be designated and which one should
* be demoted.
@@ -490,82 +242,22 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* the previously designated pdp.
*/
DroolsPdp designatedPdp = computeDesignatedPdp(listOfDesignated, mostRecentPrimary);
- if (designatedPdp != null) {
- pdpdNowActive = designatedPdp.getPdpId();
- }
if (designatedPdp == null) {
logger.warn("WARNING: DesignatedWaiter.run: No viable PDP found to be Designated. "
+ "designatedPdp still null.");
- // Just to be sure the parameters are correctly set
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp,false);
- isDesignated = false;
-
- waitTimerLastRunDate = new Date();
- logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}",
- waitTimerLastRunDate);
- myPdp.setUpdatedDate(waitTimerLastRunDate);
- pdpsConnector.update(myPdp);
-
+ designateNoPdp();
return;
+ }
- } else if (designatedPdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId());
- /*
- * update function expects myPdp.isDesignated to be true.
- */
- try {
- //Keep the order like this. StateManagement is last since it triggers controller init
- myPdp.setDesignated(true);
- myPdp.setDesignatedDate(new Date());
- pdpsConnector.setDesignated(myPdp, true);
- isDesignated = true;
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- /*
- * Only call promote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- * Note that we need to fetch the session list from 'mostRecentPrimary'
- * at this point -- soon, 'mostRecentPrimary' will be set to this host.
- */
- //this.sessions = mostRecentPrimary.getSessions();
- stateManagementFeature.promote();
- }
- } catch (Exception e) {
- logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}"
- + ", message=", myPdp.getPdpId(), e);
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp,false);
- isDesignated = false;
- //If you can't promote it, demote it
- try {
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only call demote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- */
- stateManagementFeature.demote();
- }
- } catch (Exception e1) {
- logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException "
- + "attempting to promote then demote PDP={}, message=",
- myPdp.getPdpId(), e1);
- }
-
- }
- waitTimerLastRunDate = new Date();
- logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) "
- + "waitTimerLastRunDate = " + waitTimerLastRunDate);
- myPdp.setUpdatedDate(waitTimerLastRunDate);
- pdpsConnector.update(myPdp);
+ pdpdNowActive = designatedPdp.getPdpId();
+ if (pdpdNowActive.equals(myPdp.getPdpId())) {
+ logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId());
+ designateMyPdp();
return;
}
+
isDesignated = false;
} // end synchronized
@@ -583,6 +275,350 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
logger.error("DesignatedWaiter.run caught an unexpected exception: ", e);
}
} // end run
+
+ private void allPdpsFailed(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) {
+ boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps);
+ logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed);
+ for (DroolsPdp pdp : pdps) {
+ logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId());
+
+ /*
+ * Note: side effect of isPdpCurrent is that any stale but
+ * designated PDPs will be marked as un-designated.
+ */
+ boolean isCurrent = pdpsConnector.isPdpCurrent(pdp);
+
+ /*
+ * We can't use stateManagement.getStandbyStatus() here, because
+ * we need the standbyStatus, not for this PDP, but for the PDP
+ * being processed by this loop iteration.
+ */
+ String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
+ if (standbyStatus == null) {
+ // Treat this case as a cold standby -- if we
+ // abort here, no sessions will be created in a
+ // single-node test environment.
+ standbyStatus = StateManagement.COLD_STANDBY;
+ }
+ logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent);
+
+ adjustPdp(pdp, isCurrent, designatedPdpHasFailed, standbyStatus, listOfDesignated);
+
+
+ } // end pdps loop
+ }
+
+ private void adjustPdp(DroolsPdp pdp, boolean isCurrent, boolean designatedPdpHasFailed, String standbyStatus,
+ List<DroolsPdp> listOfDesignated) {
+ /*
+ * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn
+ * and evaluate the each pdp in the list of pdps against each combination.
+ */
+ if (pdp.isDesignated()) {
+ /*
+ * This is the first combination of isDesignated and isCurrent
+ */
+ if (isCurrent) {
+ pdpDesignatedCurrent(pdp, standbyStatus, listOfDesignated);
+
+ /*
+ * The second combination of isDesignated and isCurrent
+ *
+ * PDP is designated but not current; it has failed.
+ * So we stand it down (it doesn't matter what
+ * its standbyStatus is). None of these go on the list.
+ */
+ } else {
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently "
+ + "designated but is not current; "
+ + "it has failed. Standing down. standbyStatus= {}",
+ pdp.getPdpId(), standbyStatus);
+ pdpDesignatedNotCurrent(pdp);
+ }
+
+ } else {
+ // NOT designated
+
+
+ /*
+ * The third combination of isDesignated and isCurrent
+ * /*
+ * If a PDP is not currently designated but is providing service
+ * (erroneous, but recoverable) or hot standby
+ * we can add it to the list of possible designated if all the designated have failed
+ */
+ if (isCurrent) {
+ pdpNotDesignatedCurrent(pdp, designatedPdpHasFailed, standbyStatus,
+ listOfDesignated);
+
+ /*
+ * The fourth combination of isDesignated and isCurrent
+ *
+ * We are not going to put any of these on the list since it appears they have failed.
+ *
+ */
+ } else {
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {} "
+ + "designated= {}, current= {}, "
+ + "designatedPdpHasFailed= {}, "
+ + "standbyStatus= {}",pdp.getPdpId(),
+ pdp.isDesignated(), false, designatedPdpHasFailed, standbyStatus);
+ pdpNotDesignatedNotCurrent(pdp, standbyStatus);
+ }
+ }
+ }
+
+ private void pdpDesignatedCurrent(DroolsPdp pdp, String standbyStatus, List<DroolsPdp> listOfDesignated) {
+ //It is current, but it could have a standbystatus=coldstandby / hotstandby
+ //If so, we need to stand it down and demote it
+ if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
+ + "butstandbystatus is not providingservice. "
+ + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId());
+ // So, we must demote it
+ try {
+ demoteMyPdp(pdp, standbyStatus);
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} "
+ + "Caught Exception attempting to demote myPdp,"
+ + "message= {}", myPdp.getPdpId(), e);
+ }
+ } else {
+ // Don't demote a remote PDP that is current. It should catch itself
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
+ + "but standbystatus is not providingservice. "
+ + " Cannot execute stateManagement.demote() "
+ + "since it it is not myPdp\n\n",
+ myPdp.getPdpId());
+ }
+
+ } else {
+ // If we get here, it is ok to be on the list
+ logger.debug("DesignatedWaiter.run: PDP= {} is designated, "
+ + "current and {} Noting PDP as "
+ + "designated, standbyStatus= {}",
+ pdp.getPdpId(), standbyStatus, standbyStatus);
+ listOfDesignated.add(pdp);
+ }
+ }
+
+ private void demoteMyPdp(DroolsPdp pdp, String standbyStatus) throws Exception {
+ //Keep the order like this. StateManagement is last since it
+ //triggers controller shutdown
+ //This will change isDesignated and it can enter another if(combination) below
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only demote it if it appears it has not already been demoted. Don't worry
+ * about synching with the topic endpoint states. That is done by the
+ * refreshStateAudit
+ */
+ stateManagementFeature.demote();
+ }
+ }
+
+ private void pdpDesignatedNotCurrent(DroolsPdp pdp) {
+ /*
+ * Changes designated to 0 but it is still potentially providing service
+ * Will affect isDesignated, so, it can enter an if(combination) below
+ */
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+
+ //need to change standbystatus to coldstandby
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. "
+ + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId());
+ // We found that myPdp is designated but not current
+ // So, we must cause it to disableFail
+ try {
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ stateManagementFeature.disableFailed();
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
+ + "attempting to disableFail myPdp {}, message= {}",
+ myPdp.getPdpId(), myPdp.getPdpId(), e);
+ }
+ } else { //it is a remote PDP that is failed
+ logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. "
+ + " Executing stateManagement.disableFailed(otherResourceName)\n\n",
+ pdp.getPdpId() );
+ // We found a PDP is designated but not current
+ // We already called standdown(pdp) which will change designated to false
+ // Now we need to disableFail it to get its states in synch. The standbyStatus
+ // should equal coldstandby
+ try {
+ stateManagementFeature.disableFailed(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to "
+ + "disableFail({}), message= {}",
+ pdp.getPdpId(), pdp.getPdpId(), e);
+ }
+
+ }
+ }
+
+ private void pdpNotDesignatedCurrent(DroolsPdp pdp, boolean designatedPdpHasFailed, String standbyStatus,
+ List<DroolsPdp> listOfDesignated) {
+ if (!(StateManagement.HOT_STANDBY.equals(standbyStatus)
+ || StateManagement.COLD_STANDBY.equals(standbyStatus))) {
+ logger.debug("\n\nDesignatedWaiter.run: PDP {}"
+ + " is NOT designated but IS current and"
+ + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus);
+ // Since it is current, we assume it can adjust its own state.
+ // We will demote if it is myPdp
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ //demote it
+ logger.debug("DesignatedWaiter.run: PDP {} going to "
+ + "setDesignated = false and calling stateManagement.demote",
+ pdp.getPdpId());
+ try {
+ //Keep the order like this.
+ //StateManagement is last since it triggers controller shutdown
+ pdpsConnector.setDesignated(myPdp, false);
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ //This is definitely not a redundant call.
+ //It is attempting to correct a problem
+ stateManagementFeature.demote();
+ //recheck the standbystatus
+ standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
+ + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(),
+ myPdp.getPdpId(), e);
+ }
+
+ }
+ }
+ if (StateManagement.HOT_STANDBY.equals(standbyStatus) && designatedPdpHasFailed) {
+ //add it to the list
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {}"
+ + " is not designated but is {} and designated PDP "
+ + "has failed. standbyStatus= {}", pdp.getPdpId(),
+ standbyStatus, standbyStatus);
+ listOfDesignated.add(pdp);
+ }
+ }
+
+ private void pdpNotDesignatedNotCurrent(DroolsPdp pdp, String standbyStatus) {
+ if (StateManagement.COLD_STANDBY.equals(standbyStatus)) {
+ return;
+ }
+
+ //stand it down
+ //disableFail it
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ /*
+ * I don't actually know how this condition could
+ * happen, but if it did, we would want to declare it
+ * failed.
+ */
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
+ + " Executing stateManagement.disableFailed()\n\n",
+ myPdp.getPdpId());
+ // So, we must disableFail it
+ try {
+ //Keep the order like this.
+ //StateManagement is last since it triggers controller shutdown
+ pdpsConnector.setDesignated(myPdp, false);
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ stateManagementFeature.disableFailed();
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
+ + "disableFail myPdp {}, message= {}",
+ myPdp.getPdpId(), myPdp.getPdpId(), e);
+ }
+ } else { //it is remote
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
+ + " Executing stateManagement.disableFailed({})\n\n",
+ myPdp.getPdpId(), pdp.getPdpId());
+ // We already called standdown(pdp) which will change designated to false
+ // Now we need to disableFail it to get its states in sync.
+ // StandbyStatus = coldstandby
+ try {
+ stateManagementFeature.disableFailed(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: for PDP {}"
+ + " Caught Exception attempting to disableFail({})"
+ + ", message=", pdp.getPdpId(), pdp.getPdpId(), e);
+ }
+ }
+ }
+
+ private void designateNoPdp() {
+ // Just to be sure the parameters are correctly set
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp,false);
+ isDesignated = false;
+
+ waitTimerLastRunDate = new Date();
+ logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}",
+ waitTimerLastRunDate);
+ myPdp.setUpdatedDate(waitTimerLastRunDate);
+ pdpsConnector.update(myPdp);
+ }
+
+ private void designateMyPdp() {
+ /*
+ * update function expects myPdp.isDesignated to be true.
+ */
+ try {
+ //Keep the order like this. StateManagement is last since it triggers controller init
+ myPdp.setDesignated(true);
+ myPdp.setDesignatedDate(new Date());
+ pdpsConnector.setDesignated(myPdp, true);
+ isDesignated = true;
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ /*
+ * Only call promote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ * Note that we need to fetch the session list from 'mostRecentPrimary'
+ * at this point -- soon, 'mostRecentPrimary' will be set to this host.
+ */
+ //this.sessions = mostRecentPrimary.getSessions();
+ stateManagementFeature.promote();
+ }
+ } catch (Exception e) {
+ logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}"
+ + ", message=", myPdp.getPdpId(), e);
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp,false);
+ isDesignated = false;
+ //If you can't promote it, demote it
+ try {
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only call demote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ */
+ stateManagementFeature.demote();
+ }
+ } catch (Exception e1) {
+ logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException "
+ + "attempting to promote then demote PDP={}, message=",
+ myPdp.getPdpId(), e1);
+ }
+
+ }
+ waitTimerLastRunDate = new Date();
+ logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) "
+ + "waitTimerLastRunDate = " + waitTimerLastRunDate);
+ myPdp.setUpdatedDate(waitTimerLastRunDate);
+ pdpsConnector.update(myPdp);
+ }
}
/**
@@ -621,53 +657,27 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* @return drools pdp object
*/
public DroolsPdp computeMostRecentPrimary(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) {
- boolean containsDesignated = false;
- for (DroolsPdp pdp : listOfDesignated) {
- if (pdp.isDesignated()) {
- containsDesignated = true;
- }
- }
+ boolean containsDesignated = listOfDesignated.stream().anyMatch(DroolsPdp::isDesignated);
+
DroolsPdp mostRecentPrimary = new DroolsPdpImpl(null, true, 1, new Date(0));
mostRecentPrimary.setSite(null);
logger.debug("DesignatedWaiter.run listOfDesignated.size() = {}", listOfDesignated.size());
+
if (listOfDesignated.size() <= 1) {
logger.debug("DesignatedWainter.run: listOfDesignated.size <=1");
//Only one or none is designated or hot standby. Choose the latest designated date
- for (DroolsPdp pdp : pdps) {
- logger.debug("DesignatedWaiter.run pdp = {}"
- + " pdp.getDesignatedDate() = {}",
- pdp.getPdpId(), pdp.getDesignatedDate());
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
+ mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary);
+
} else if (listOfDesignated.size() == pdps.size()) {
logger.debug("DesignatedWainter.run: listOfDesignated.size = pdps.size() which is {}", pdps.size());
//They are all designated or all hot standby.
- mostRecentPrimary = null;
- for (DroolsPdp pdp : pdps) {
- if (mostRecentPrimary == null) {
- mostRecentPrimary = pdp;
- continue;
- }
- if (containsDesignated) { //Choose the site of the first designated date
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- } else { //Choose the site with the latest designated date
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
- }
+ mostRecentPrimary = getBestDesignated(pdps, containsDesignated);
+
} else {
logger.debug("DesignatedWainter.run: Some but not all are designated or hot standby. ");
+ logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
//Some but not all are designated or hot standby.
if (containsDesignated) {
- logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
/*
* The list only contains designated. This is a problem. It is most likely a race
* condition that resulted in two thinking they should be designated. Choose the
@@ -675,29 +685,66 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* This should be the site that had the last designation before this race condition
* occurred.
*/
- for (DroolsPdp pdp : pdps) {
- if (listOfDesignated.contains(pdp)) {
- continue; //Don't consider this entry
- }
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
+ mostRecentPrimary = getLatestUndesignated(pdps, mostRecentPrimary, listOfDesignated);
+
} else {
- logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
//The list only contains hot standby. Choose the site of the latest designated date
- for (DroolsPdp pdp : pdps) {
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
+ mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary);
+ }
+ }
+ return mostRecentPrimary;
+ }
+
+ private DroolsPdp getBestDesignated(Collection<DroolsPdp> pdps, boolean containsDesignated) {
+ DroolsPdp mostRecentPrimary;
+ mostRecentPrimary = null;
+ for (DroolsPdp pdp : pdps) {
+ if (mostRecentPrimary == null) {
+ mostRecentPrimary = pdp;
+ continue;
+ }
+ if (containsDesignated) { //Choose the site of the first designated date
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ } else { //Choose the site with the latest designated date
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
}
}
}
return mostRecentPrimary;
}
+ private DroolsPdp getLatestUndesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary,
+ List<DroolsPdp> listOfDesignated) {
+ for (DroolsPdp pdp : pdps) {
+ if (listOfDesignated.contains(pdp)) {
+ continue; //Don't consider this entry
+ }
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ }
+ return mostRecentPrimary;
+ }
+
+ private DroolsPdp getLatestDesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary) {
+ for (DroolsPdp pdp : pdps) {
+ logger.debug("DesignatedWaiter.run pdp = {}"
+ + " pdp.getDesignatedDate() = {}",
+ pdp.getPdpId(), pdp.getDesignatedDate());
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ }
+ return mostRecentPrimary;
+ }
+
/**
* Compue designated pdp.
*
@@ -706,125 +753,148 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* @return drools pdp object
*/
public DroolsPdp computeDesignatedPdp(List<DroolsPdp> listOfDesignated, DroolsPdp mostRecentPrimary) {
- DroolsPdp designatedPdp = null;
- DroolsPdp lowestPriorityPdp = null;
- if (listOfDesignated.size() > 1) {
- logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(),
- listOfDesignated.size());
- DroolsPdp rejectedPdp = null;
- DroolsPdp lowestPrioritySameSite = null;
- DroolsPdp lowestPriorityDifferentSite = null;
- for (DroolsPdp pdp : listOfDesignated) {
- // We need to determine if another PDP is the lowest priority
- if (nullSafeEquals(pdp.getSite(),mostRecentPrimary.getSite())) {
- if (lowestPrioritySameSite == null) {
- if (lowestPriorityDifferentSite != null) {
- rejectedPdp = lowestPriorityDifferentSite;
- }
- lowestPrioritySameSite = pdp;
- } else {
- if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) {
- continue;//nothing to compare
- }
- if (pdp.comparePriority(lowestPrioritySameSite) < 0) {
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(),
- lowestPrioritySameSite.getPdpId());
- //we need to reject lowestPrioritySameSite
- rejectedPdp = lowestPrioritySameSite;
- lowestPrioritySameSite = pdp;
- } else {
- //we need to reject pdp and keep lowestPrioritySameSite
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} "
- + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(),
- lowestPrioritySameSite.getPdpId());
- rejectedPdp = pdp;
- }
- }
- } else {
- if (lowestPrioritySameSite != null) {
- //if we already have a candidate for same site, we don't want to bother with different sites
- rejectedPdp = pdp;
- } else {
- if (lowestPriorityDifferentSite == null) {
- lowestPriorityDifferentSite = pdp;
- continue;
- }
- if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) {
- continue;//nothing to compare
- }
- if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) {
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
- lowestPriorityDifferentSite.getPdpId());
- //we need to reject lowestPriorityDifferentSite
- rejectedPdp = lowestPriorityDifferentSite;
- lowestPriorityDifferentSite = pdp;
- } else {
- //we need to reject pdp and keep lowestPriorityDifferentSite
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
- lowestPriorityDifferentSite.getPdpId());
- rejectedPdp = pdp;
- }
- }
+ if (listOfDesignated.isEmpty()) {
+ logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId());
+ return null;
+ }
+
+ if (listOfDesignated.size() == 1) {
+ logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
+ + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId());
+ return listOfDesignated.get(0);
+ }
+
+ logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(),
+ listOfDesignated.size());
+ DesignatedData data = new DesignatedData();
+ for (DroolsPdp pdp : listOfDesignated) {
+ DroolsPdp rejectedPdp;
+
+ // We need to determine if another PDP is the lowest priority
+ if (nullSafeEquals(pdp.getSite(), mostRecentPrimary.getSite())) {
+ rejectedPdp = data.compareSameSite(pdp);
+ } else {
+ rejectedPdp = data.compareDifferentSite(pdp);
+ }
+ // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible
+ // for demoting itself
+ if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}"
+ + " is NOT the lowest priority. Executing stateManagement.demote()\n\n",
+ myPdp.getPdpId(),
+ myPdp.getPdpId());
+ // We found that myPdp is on the listOfDesignated and it is not the lowest priority
+ // So, we must demote it
+ demoteMyPdp();
+ }
+ } //end: for(DroolsPdp pdp : listOfDesignated)
+
+ DroolsPdp lowestPriorityPdp = data.getLowestPriority();
+
+ //now we have a valid value for lowestPriorityPdp
+ logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
+ + "found the LOWEST priority pdp ID: {} "
+ + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n",
+ myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp);
+ return lowestPriorityPdp;
+
+ }
+
+ private class DesignatedData {
+ private DroolsPdp lowestPrioritySameSite = null;
+ private DroolsPdp lowestPriorityDifferentSite = null;
+
+ private DroolsPdp compareSameSite(DroolsPdp pdp) {
+ if (lowestPrioritySameSite == null) {
+ if (lowestPriorityDifferentSite != null) {
+ //we need to reject lowestPriorityDifferentSite
+ DroolsPdp rejectedPdp = lowestPriorityDifferentSite;
+ lowestPriorityDifferentSite = pdp;
+ return rejectedPdp;
}
- // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible
- // for demoting itself
- if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}"
- + " is NOT the lowest priority. Executing stateManagement.demote()\n\n",
- myPdp.getPdpId(),
- myPdp.getPdpId());
- // We found that myPdp is on the listOfDesignated and it is not the lowest priority
- // So, we must demote it
- try {
- //Keep the order like this. StateManagement is last since it triggers controller shutdown
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only call demote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- */
- stateManagementFeature.demote();
- }
- } catch (Exception e) {
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
- + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(),
- e);
- }
+ lowestPrioritySameSite = pdp;
+ return null;
+ } else {
+ if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) {
+ return null;//nothing to compare
}
- } //end: for(DroolsPdp pdp : listOfDesignated)
+ if (pdp.comparePriority(lowestPrioritySameSite) < 0) {
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPrioritySameSite.getPdpId());
+ //we need to reject lowestPrioritySameSite
+ DroolsPdp rejectedPdp = lowestPrioritySameSite;
+ lowestPrioritySameSite = pdp;
+ return rejectedPdp;
+ } else {
+ //we need to reject pdp and keep lowestPrioritySameSite
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} "
+ + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(),
+ lowestPrioritySameSite.getPdpId());
+ return pdp;
+ }
+ }
+ }
+
+ private DroolsPdp compareDifferentSite(DroolsPdp pdp) {
if (lowestPrioritySameSite != null) {
- lowestPriorityPdp = lowestPrioritySameSite;
+ //if we already have a candidate for same site, we don't want to bother with different sites
+ return pdp;
} else {
- lowestPriorityPdp = lowestPriorityDifferentSite;
+ if (lowestPriorityDifferentSite == null) {
+ lowestPriorityDifferentSite = pdp;
+ return null;
+ }
+ if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) {
+ return null;//nothing to compare
+ }
+ if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) {
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPriorityDifferentSite.getPdpId());
+ //we need to reject lowestPriorityDifferentSite
+ DroolsPdp rejectedPdp = lowestPriorityDifferentSite;
+ lowestPriorityDifferentSite = pdp;
+ return rejectedPdp;
+ } else {
+ //we need to reject pdp and keep lowestPriorityDifferentSite
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPriorityDifferentSite.getPdpId());
+ return pdp;
+ }
}
- //now we have a valid value for lowestPriorityPdp
- logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
- + "found the LOWEST priority pdp ID: {} "
- + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n",
- myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp);
- designatedPdp = lowestPriorityPdp;
-
- } else if (listOfDesignated.isEmpty()) {
- logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId());
- designatedPdp = null;
- } else { //only one in listOfDesignated
- logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
- + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId());
- designatedPdp = listOfDesignated.get(0);
}
- return designatedPdp;
+ private DroolsPdp getLowestPriority() {
+ return (lowestPrioritySameSite != null ? lowestPrioritySameSite : lowestPriorityDifferentSite);
+ }
+ }
+
+ private void demoteMyPdp() {
+ try {
+ //Keep the order like this. StateManagement is last since it triggers controller shutdown
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only call demote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ */
+ stateManagementFeature.demote();
+ }
+ } catch (Exception e) {
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
+ + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(),
+ e);
+ }
}
private class TimerUpdateClass extends TimerTask {
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
index 078c891f..ed53f55c 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
@@ -61,18 +61,19 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
.setFlushMode(FlushModeType.COMMIT).getResultList();
LinkedList<DroolsPdp> droolsPdpsReturnList = new LinkedList<>();
for (Object o : droolsPdpsList) {
- if (o instanceof DroolsPdp) {
- //Make sure it is not a cached version
- em.refresh((DroolsPdpEntity)o);
- droolsPdpsReturnList.add((DroolsPdp)o);
- if (logger.isDebugEnabled()) {
- DroolsPdp droolsPdp = (DroolsPdp)o;
- logger.debug("getDroolsPdps: PDP= {}"
- + ", isDesignated= {}"
- + ", updatedDate= {}"
- + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(),
- droolsPdp.getUpdatedDate(), droolsPdp.getPriority());
- }
+ if (!(o instanceof DroolsPdp)) {
+ continue;
+ }
+ //Make sure it is not a cached version
+ DroolsPdp droolsPdp = (DroolsPdp)o;
+ em.refresh(droolsPdp);
+ droolsPdpsReturnList.add(droolsPdp);
+ if (logger.isDebugEnabled()) {
+ logger.debug("getDroolsPdps: PDP= {}"
+ + ", isDesignated= {}"
+ + ", updatedDate= {}"
+ + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(),
+ droolsPdp.getUpdatedDate(), droolsPdp.getPriority());
}
}
try {
@@ -233,14 +234,7 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
+ " found, designated= {}"
+ ", setting to {}", pdp.getPdpId(), droolsPdpEntity.isDesignated(),
designated);
- droolsPdpEntity.setDesignated(designated);
- if (designated) {
- em.refresh(droolsPdpEntity); //make sure we get the DB value
- if (!droolsPdpEntity.isDesignated()) {
- droolsPdpEntity.setDesignatedDate(new Date());
- }
-
- }
+ setPdpDesignation(em, droolsPdpEntity, designated);
em.getTransaction().commit();
} else {
logger.error("setDesignated: PDP={}"
@@ -256,6 +250,17 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
}
+ private void setPdpDesignation(EntityManager em, DroolsPdpEntity droolsPdpEntity, boolean designated) {
+ droolsPdpEntity.setDesignated(designated);
+ if (designated) {
+ em.refresh(droolsPdpEntity); //make sure we get the DB value
+ if (!droolsPdpEntity.isDesignated()) {
+ droolsPdpEntity.setDesignatedDate(new Date());
+ }
+
+ }
+ }
+
@Override
public void standDownPdp(String pdpId) {
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
index 3f4ae557..7669cc22 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
@@ -125,138 +125,163 @@ public class PmStandbyStateChangeNotifier extends StateChangeNotifier {
if (standbyStatus == null || standbyStatus.equals(StateManagement.NULL_VALUE)) {
logger.debug("handleStateChange: standbyStatus is null; standing down PDP={}", pdpId);
- if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: "
- + "Is returning because standbyStatus is null and was previously 'null'; PDP={}",
- pdpId);
- return;
- }
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
- cancelTimer();
- // Only want to lock the endpoints, not the controllers.
- getPolicyEngineManager().deactivate();
- // The operation was fully successful, but you cannot assign it a real null value
- // because later we might try to execute previousStandbyStatus.equals() and get
- // a null pointer exception.
- previousStandbyStatus = StateManagement.NULL_VALUE;
- } catch (Exception e) {
- logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
- }
+ standDownPdpNull(pdpId);
+
} else if (standbyStatus.equals(StateManagement.HOT_STANDBY)
|| standbyStatus.equals(StateManagement.COLD_STANDBY)) {
logger.debug("handleStateChange: standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
- return;
- }
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
- cancelTimer();
- // Only want to lock the endpoints, not the controllers.
- getPolicyEngineManager().deactivate();
- // The operation was fully successful
- previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
- } catch (Exception e) {
- logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(),
- e);
- }
+ standDownPdp(pdpId, standbyStatus);
} else if (standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
logger.debug("handleStateChange: standbyStatus= {} scheduling activation of PDP={}", standbyStatus,
pdpId);
- if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
- return;
- }
- try {
- // UnLock all the endpoints
- logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus);
- /*
- * Only endpoints should be unlocked. Controllers have not been locked. Because,
- * sometimes, it is possible for more than one PDP-D to become active (race
- * conditions) we need to delay the activation of the topic endpoint interfaces to
- * give the election algorithm time to resolve the conflict.
- */
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}",
- isWaitingForActivation);
-
- // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler
- // conflict.
- // You could have multiple election handlers thinking they can take over.
-
- // First let's check that the timer has not died
- if (isWaitingForActivation) {
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}",
- isWaitingForActivation);
- long now = new Date().getTime();
- long waitTimeMs = now - startTimeWaitingForActivationMs;
- if (waitTimeMs > 3 * waitInterval) {
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer "
- + "may be hung, waitTimeMs = {} and allowable waitInterval = {}"
- + " Checking whether it is currently in activation. isNowActivating = {}",
- waitTimeMs, waitInterval, isNowActivating);
- // Now check that it is not currently executing an activation
- if (!isNowActivating) {
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation "
- + "wait timer died");
- // This will assure the timer is cancelled and rescheduled.
- isWaitingForActivation = false;
- }
- }
+ schedulePdpActivation(pdpId, standbyStatus);
- }
+ } else {
+ logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
+ standDownPdpUnsupported(pdpId, standbyStatus);
+ }
- if (!isWaitingForActivation) {
- // Just in case there is an old timer hanging around
- logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
- cancelTimer();
- delayActivateTimer = makeTimer();
- // delay the activate so the DesignatedWaiter can run twice
- delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
- isWaitingForActivation = true;
- startTimeWaitingForActivationMs = new Date().getTime();
- logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms",
- waitInterval);
- } else {
- logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is "
- + "waiting for activation.");
- }
+ logger.debug("handleStateChange: Exiting");
+ }
- } catch (Exception e) {
- logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ",
- e);
- }
+ private void standDownPdpNull(String pdpId) {
+ if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: "
+ + "Is returning because standbyStatus is null and was previously 'null'; PDP={}",
+ pdpId);
+ return;
+ }
- } else {
- logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is "
- + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
- return;
- }
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
+ cancelTimer();
// Only want to lock the endpoints, not the controllers.
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
+ getPolicyEngineManager().deactivate();
+ // The operation was fully successful, but you cannot assign it a real null value
+ // because later we might try to execute previousStandbyStatus.equals() and get
+ // a null pointer exception.
+ previousStandbyStatus = StateManagement.NULL_VALUE;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
+ }
+ }
+
+ private void standDownPdp(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
+ return;
+ }
+
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
+ cancelTimer();
+ // Only want to lock the endpoints, not the controllers.
+ getPolicyEngineManager().deactivate();
+ // The operation was fully successful
+ previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(),
+ e);
+ }
+ }
+
+ private void schedulePdpActivation(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
+ return;
+ }
+
+ try {
+ // UnLock all the endpoints
+ logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus);
+ /*
+ * Only endpoints should be unlocked. Controllers have not been locked. Because,
+ * sometimes, it is possible for more than one PDP-D to become active (race
+ * conditions) we need to delay the activation of the topic endpoint interfaces to
+ * give the election algorithm time to resolve the conflict.
+ */
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}",
+ isWaitingForActivation);
+
+ // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler
+ // conflict.
+ // You could have multiple election handlers thinking they can take over.
+
+ // First let's check that the timer has not died
+ checkTimerStatus();
+
+ if (!isWaitingForActivation) {
+ // Just in case there is an old timer hanging around
+ logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
cancelTimer();
- getPolicyEngineManager().deactivate();
- // We know the standbystatus is unsupported
- previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED;
- } catch (Exception e) {
- logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ",
- standbyStatus, e.getMessage(), e);
+ delayActivateTimer = makeTimer();
+ // delay the activate so the DesignatedWaiter can run twice
+ delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
+ isWaitingForActivation = true;
+ startTimeWaitingForActivationMs = new Date().getTime();
+ logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms",
+ waitInterval);
+ } else {
+ logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is "
+ + "waiting for activation.");
}
+
+ } catch (Exception e) {
+ logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ",
+ e);
+ }
+ }
+
+ private void checkTimerStatus() {
+ if (isWaitingForActivation) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}",
+ isWaitingForActivation);
+ long now = new Date().getTime();
+ long waitTimeMs = now - startTimeWaitingForActivationMs;
+ if (waitTimeMs > 3 * waitInterval) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer "
+ + "may be hung, waitTimeMs = {} and allowable waitInterval = {}"
+ + " Checking whether it is currently in activation. isNowActivating = {}",
+ waitTimeMs, waitInterval, isNowActivating);
+ // Now check that it is not currently executing an activation
+ if (!isNowActivating) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation "
+ + "wait timer died");
+ // This will assure the timer is cancelled and rescheduled.
+ isWaitingForActivation = false;
+ }
+ }
+ }
+ }
+
+ private void standDownPdpUnsupported(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is "
+ + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
+ return;
+ }
+
+ // Only want to lock the endpoints, not the controllers.
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
+ cancelTimer();
+ getPolicyEngineManager().deactivate();
+ // We know the standbystatus is unsupported
+ previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ",
+ standbyStatus, e.getMessage(), e);
}
- logger.debug("handleStateChange: Exiting");
}
private void cancelTimer() {
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
index 8b653aae..dd0fb7b8 100644
--- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
+++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
@@ -99,7 +99,7 @@ public class DMaaPSimulatorJaxRs {
return response;
}
- private String waitForNextMessageFromQueue(int timeout, String topicName) {
+ protected String waitForNextMessageFromQueue(int timeout, String topicName) {
try {
sleep(timeout);
if (queues.containsKey(topicName)) {
@@ -129,10 +129,7 @@ public class DMaaPSimulatorJaxRs {
@Consumes(MediaType.TEXT_PLAIN)
public String publish(@PathParam("topicName") String topicName, String body) {
BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>());
-
- if (!queue.offer(body)) {
- logger.warn("error on topic {}, failed to place body {} on queue", topicName, body);
- }
+ queue.offer(body);
return "";
}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
index 7200bdce..b1275004 100644
--- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
+++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
@@ -21,6 +21,7 @@
package org.onap.policy.drools.simulators;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
import org.junit.After;
import org.junit.Before;
@@ -130,6 +132,16 @@ public class DMaaPSimulatorJaxRsTest {
@Test
public void testWaitForNextMessageFromQueue() throws InterruptedException {
+ CountDownLatch waitCalled = new CountDownLatch(1);
+
+ sim = new DMaaPSimulatorJaxRs() {
+ @Override
+ protected String waitForNextMessageFromQueue(int timeout, String topicName) {
+ waitCalled.countDown();
+ return super.waitForNextMessageFromQueue(timeout, topicName);
+ }
+ };
+
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
CountDownLatch latch1 = backgroundSubscribe(queue);
@@ -143,7 +155,7 @@ public class DMaaPSimulatorJaxRsTest {
* Must pause to prevent the topic from being created before subscribe() is
* invoked.
*/
- Thread.sleep(LONG_TIMEOUT_MS / 3);
+ assertTrue(waitCalled.await(1, TimeUnit.SECONDS));
// only publish one message
sim.publish(TOPIC, MESSAGE);
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
index 9d39a7bf..dc0f3784 100644
--- a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
@@ -30,9 +30,9 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,96 +73,172 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
throws IOException, InterruptedException {
logger.debug("Running 'RepositoryAudit.invoke'");
- boolean isActive = true;
- // ignore errors by default
- boolean ignoreErrors = true;
- String repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active");
- String repoAuditIgnoreErrors =
- StateManagementProperties.getProperty("repository.audit.ignore.errors");
+ InvokeData data = new InvokeData();
+
logger.debug("RepositoryAudit.invoke: repoAuditIsActive = {}"
- + ", repoAuditIgnoreErrors = {}",repoAuditIsActive, repoAuditIgnoreErrors);
-
- if (repoAuditIsActive != null) {
- try {
- isActive = Boolean.parseBoolean(repoAuditIsActive.trim());
- } catch (NumberFormatException e) {
- logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}",
- repoAuditIsActive);
- }
- }
+ + ", repoAuditIgnoreErrors = {}",data.repoAuditIsActive, data.repoAuditIgnoreErrors);
- if (!isActive) {
- logger.info("RepositoryAudit.invoke: exiting because isActive = {}", isActive);
+ data.initIsActive();
+
+ if (!data.isActive) {
+ logger.info("RepositoryAudit.invoke: exiting because isActive = {}", data.isActive);
return;
}
- if (repoAuditIgnoreErrors != null) {
- try {
- ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim());
- } catch (NumberFormatException e) {
- ignoreErrors = true;
- logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}",
- repoAuditIgnoreErrors);
- }
- } else {
- ignoreErrors = true;
- }
+ data.initIgnoreErrors();
+ data.initTimeout();
- // Fetch repository information from 'IntegrityMonitorProperties'
- String repositoryId =
- StateManagementProperties.getProperty("repository.audit.id");
- String repositoryUrl =
- StateManagementProperties.getProperty("repository.audit.url");
- String repositoryUsername =
- StateManagementProperties.getProperty("repository.audit.username");
- String repositoryPassword =
- StateManagementProperties.getProperty("repository.audit.password");
- boolean upload =
- repositoryId != null && repositoryUrl != null
- && repositoryUsername != null && repositoryPassword != null;
+ /*
+ * 1) create temporary directory
+ */
+ data.dir = Files.createTempDirectory("auditRepo");
+ logger.info("RepositoryAudit: temporary directory = {}", data.dir);
- // used to incrementally construct response as problems occur
- // (empty = no problems)
- StringBuilder response = new StringBuilder();
-
- long timeoutInSeconds = DEFAULT_TIMEOUT;
- String timeoutString =
- StateManagementProperties.getProperty("repository.audit.timeout");
- if (timeoutString != null && !timeoutString.isEmpty()) {
- try {
- timeoutInSeconds = Long.valueOf(timeoutString);
- } catch (NumberFormatException e) {
- logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'",
- timeoutString, e);
- if (!ignoreErrors) {
- response.append("Invalid 'repository.audit.timeout' value: '")
- .append(timeoutString).append("'\n");
- setResponse(response.toString());
- }
- }
+ // nested 'pom.xml' file and 'repo' directory
+ final Path pom = data.dir.resolve("pom.xml");
+ final Path repo = data.dir.resolve("repo");
+
+ /*
+ * 2) Create test file, and upload to repository
+ * (only if repository information is specified)
+ */
+ if (data.upload) {
+ data.uploadTestFile();
}
- // artifacts to be downloaded
- LinkedList<Artifact> artifacts = new LinkedList<>();
+ /*
+ * 3) create 'pom.xml' file in temporary directory
+ */
+ data.createPomFile(repo, pom);
/*
- * 1) create temporary directory
+ * 4) Invoke external 'mvn' process to do the downloads
*/
- Path dir = Files.createTempDirectory("auditRepo");
- logger.info("RepositoryAudit: temporary directory = {}", dir);
- // nested 'pom.xml' file and 'repo' directory
- final Path pom = dir.resolve("pom.xml");
- final Path repo = dir.resolve("repo");
+ // output file = ${dir}/out (this supports step '4a')
+ File output = data.dir.resolve("out").toFile();
+
+ // invoke process, and wait for response
+ int rval = data.runMaven(output);
/*
- * 2) Create test file, and upload to repository
+ * 4a) Check attempted and successful downloads from output file
+ * Note: at present, this step just generates log messages,
+ * but doesn't do any verification.
+ */
+ if (rval == 0 && output != null) {
+ generateDownloadLogs(output);
+ }
+
+ /*
+ * 5) Check the contents of the directory to make sure the downloads
+ * were successful
+ */
+ data.verifyDownloads(repo);
+
+ /*
+ * 6) Use 'curl' to delete the uploaded test file
* (only if repository information is specified)
*/
- String groupId = null;
- String artifactId = null;
- String version = null;
- if (upload) {
+ if (data.upload) {
+ data.deleteUploadedTestFile();
+ }
+
+ /*
+ * 7) Remove the temporary directory
+ */
+ Files.walkFileTree(data.dir, new RecursivelyDeleteDirectory());
+ }
+
+ private class InvokeData {
+ private boolean isActive = true;
+
+ // ignore errors by default
+ private boolean ignoreErrors = true;
+
+ private final String repoAuditIsActive;
+ private final String repoAuditIgnoreErrors;
+
+ private final String repositoryId;
+ private final String repositoryUrl;
+ private final String repositoryUsername;
+ private final String repositoryPassword;
+ private final boolean upload;
+
+ // used to incrementally construct response as problems occur
+ // (empty = no problems)
+ private final StringBuilder response = new StringBuilder();
+
+ private long timeoutInSeconds = DEFAULT_TIMEOUT;
+
+ private Path dir;
+
+ private String groupId = null;
+ private String artifactId = null;
+ private String version = null;
+
+ // artifacts to be downloaded
+ private final List<Artifact> artifacts = new LinkedList<>();
+
+ public InvokeData() {
+ repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active");
+ repoAuditIgnoreErrors = StateManagementProperties.getProperty("repository.audit.ignore.errors");
+
+ // Fetch repository information from 'IntegrityMonitorProperties'
+ repositoryId = StateManagementProperties.getProperty("repository.audit.id");
+ repositoryUrl = StateManagementProperties.getProperty("repository.audit.url");
+ repositoryUsername = StateManagementProperties.getProperty("repository.audit.username");
+ repositoryPassword = StateManagementProperties.getProperty("repository.audit.password");
+
+ upload = repositoryId != null && repositoryUrl != null
+ && repositoryUsername != null && repositoryPassword != null;
+ }
+
+ public void initIsActive() {
+ if (repoAuditIsActive != null) {
+ try {
+ isActive = Boolean.parseBoolean(repoAuditIsActive.trim());
+ } catch (NumberFormatException e) {
+ logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}",
+ repoAuditIsActive);
+ }
+ }
+ }
+
+ public void initIgnoreErrors() {
+ if (repoAuditIgnoreErrors != null) {
+ try {
+ ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim());
+ } catch (NumberFormatException e) {
+ ignoreErrors = true;
+ logger.warn(
+ "RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}",
+ repoAuditIgnoreErrors);
+ }
+ } else {
+ ignoreErrors = true;
+ }
+ }
+
+ public void initTimeout() {
+ String timeoutString =
+ StateManagementProperties.getProperty("repository.audit.timeout");
+ if (timeoutString != null && !timeoutString.isEmpty()) {
+ try {
+ timeoutInSeconds = Long.valueOf(timeoutString);
+ } catch (NumberFormatException e) {
+ logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'",
+ timeoutString, e);
+ if (!ignoreErrors) {
+ response.append("Invalid 'repository.audit.timeout' value: '")
+ .append(timeoutString).append("'\n");
+ setResponse(response.toString());
+ }
+ }
+ }
+ }
+
+ private void uploadTestFile() throws IOException, InterruptedException {
groupId = "org.onap.policy.audit";
artifactId = "repository-audit";
version = "0." + System.currentTimeMillis();
@@ -204,155 +280,105 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
}
}
- /*
- * 3) create 'pom.xml' file in temporary directory
- */
- artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2"));
-
- StringBuilder sb = new StringBuilder();
- sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
- + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n"
- + "\n"
- + " <modelVersion>4.0.0</modelVersion>\n"
- + " <groupId>empty</groupId>\n"
- + " <artifactId>empty</artifactId>\n"
- + " <version>1.0-SNAPSHOT</version>\n"
- + " <packaging>pom</packaging>\n"
- + "\n"
- + " <build>\n"
- + " <plugins>\n"
- + " <plugin>\n"
- + " <groupId>org.apache.maven.plugins</groupId>\n"
- + " <artifactId>maven-dependency-plugin</artifactId>\n"
- + " <version>2.10</version>\n"
- + " <executions>\n"
- + " <execution>\n"
- + " <id>copy</id>\n"
- + " <goals>\n"
- + " <goal>copy</goal>\n"
- + " </goals>\n"
- + " <configuration>\n"
- + " <localRepositoryDirectory>")
- .append(repo)
- .append("</localRepositoryDirectory>\n")
- .append(" <artifactItems>\n");
-
- for (Artifact artifact : artifacts) {
- // each artifact results in an 'artifactItem' element
- sb.append(" <artifactItem>\n"
- + " <groupId>")
- .append(artifact.groupId)
- .append("</groupId>\n"
- + " <artifactId>")
- .append(artifact.artifactId)
- .append("</artifactId>\n"
- + " <version>")
- .append(artifact.version)
- .append("</version>\n"
- + " <type>")
- .append(artifact.type)
- .append("</type>\n"
- + " </artifactItem>\n");
- }
- sb.append(" </artifactItems>\n"
- + " </configuration>\n"
- + " </execution>\n"
- + " </executions>\n"
- + " </plugin>\n"
- + " </plugins>\n"
- + " </build>\n"
- + "</project>\n");
-
- try (FileOutputStream fos = new FileOutputStream(pom.toFile())) {
- fos.write(sb.toString().getBytes());
- }
-
- /*
- * 4) Invoke external 'mvn' process to do the downloads
- */
-
- // output file = ${dir}/out (this supports step '4a')
- File output = dir.resolve("out").toFile();
-
- // invoke process, and wait for response
- int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile");
- logger.info("RepositoryAudit: 'mvn' return value = {}", rval);
- if (rval != 0) {
- logger.error("RepositoryAudit: 'mvn compile' invocation failed");
- if (!ignoreErrors) {
- response.append("'mvn compile' invocation failed\n");
- setResponse(response.toString());
+ private void createPomFile(final Path repo, final Path pom)
+ throws IOException {
+
+ artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2"));
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n"
+ + "\n"
+ + " <modelVersion>4.0.0</modelVersion>\n"
+ + " <groupId>empty</groupId>\n"
+ + " <artifactId>empty</artifactId>\n"
+ + " <version>1.0-SNAPSHOT</version>\n"
+ + " <packaging>pom</packaging>\n"
+ + "\n"
+ + " <build>\n"
+ + " <plugins>\n"
+ + " <plugin>\n"
+ + " <groupId>org.apache.maven.plugins</groupId>\n"
+ + " <artifactId>maven-dependency-plugin</artifactId>\n"
+ + " <version>2.10</version>\n"
+ + " <executions>\n"
+ + " <execution>\n"
+ + " <id>copy</id>\n"
+ + " <goals>\n"
+ + " <goal>copy</goal>\n"
+ + " </goals>\n"
+ + " <configuration>\n"
+ + " <localRepositoryDirectory>")
+ .append(repo)
+ .append("</localRepositoryDirectory>\n")
+ .append(" <artifactItems>\n");
+
+ for (Artifact artifact : artifacts) {
+ // each artifact results in an 'artifactItem' element
+ sb.append(" <artifactItem>\n"
+ + " <groupId>")
+ .append(artifact.groupId)
+ .append("</groupId>\n"
+ + " <artifactId>")
+ .append(artifact.artifactId)
+ .append("</artifactId>\n"
+ + " <version>")
+ .append(artifact.version)
+ .append("</version>\n"
+ + " <type>")
+ .append(artifact.type)
+ .append("</type>\n"
+ + " </artifactItem>\n");
}
- }
-
- /*
- * 4a) Check attempted and successful downloads from output file
- * Note: at present, this step just generates log messages,
- * but doesn't do any verification.
- */
- if (rval == 0 && output != null) {
- // place output in 'fileContents' (replacing the Return characters
- // with Newline)
- byte[] outputData = new byte[(int)output.length()];
- String fileContents;
- try (FileInputStream fis = new FileInputStream(output)) {
- //
- // Ideally this should be in a loop or even better use
- // Java 8 nio functionality.
- //
- int bytesRead = fis.read(outputData);
- logger.info("fileContents read {} bytes", bytesRead);
- fileContents = new String(outputData).replace('\r','\n');
+ sb.append(" </artifactItems>\n"
+ + " </configuration>\n"
+ + " </execution>\n"
+ + " </executions>\n"
+ + " </plugin>\n"
+ + " </plugins>\n"
+ + " </build>\n"
+ + "</project>\n");
+
+ try (FileOutputStream fos = new FileOutputStream(pom.toFile())) {
+ fos.write(sb.toString().getBytes());
}
+ }
- // generate log messages from 'Downloading' and 'Downloaded'
- // messages within the 'mvn' output
- int index = 0;
- while ((index = fileContents.indexOf("\nDown", index)) > 0) {
- index += 5;
- if (fileContents.regionMatches(index, "loading: ", 0, 9)) {
- index += 9;
- int endIndex = fileContents.indexOf('\n', index);
- logger.info("RepositoryAudit: Attempted download: '{}'",
- fileContents.substring(index, endIndex));
- index = endIndex;
- } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) {
- index += 8;
- int endIndex = fileContents.indexOf(' ', index);
- logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex));
- index = endIndex;
+ private int runMaven(File output) throws IOException, InterruptedException {
+ int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile");
+ logger.info("RepositoryAudit: 'mvn' return value = {}", rval);
+ if (rval != 0) {
+ logger.error("RepositoryAudit: 'mvn compile' invocation failed");
+ if (!ignoreErrors) {
+ response.append("'mvn compile' invocation failed\n");
+ setResponse(response.toString());
}
}
+ return rval;
}
- /*
- * 5) Check the contents of the directory to make sure the downloads
- * were successful
- */
- for (Artifact artifact : artifacts) {
- if (repo.resolve(artifact.groupId.replace('.','/'))
- .resolve(artifact.artifactId)
- .resolve(artifact.version)
- .resolve(artifact.artifactId + "-" + artifact.version + "."
- + artifact.type).toFile().exists()) {
- // artifact exists, as expected
- logger.info("RepositoryAudit: {} : exists", artifact.toString());
- } else {
- // Audit ERROR: artifact download failed for some reason
- logger.error("RepositoryAudit: {}: does not exist", artifact.toString());
- if (!ignoreErrors) {
- response.append("Failed to download artifact: ")
- .append(artifact).append('\n');
- setResponse(response.toString());
+ private void verifyDownloads(final Path repo) {
+ for (Artifact artifact : artifacts) {
+ if (repo.resolve(artifact.groupId.replace('.','/'))
+ .resolve(artifact.artifactId)
+ .resolve(artifact.version)
+ .resolve(artifact.artifactId + "-" + artifact.version + "."
+ + artifact.type).toFile().exists()) {
+ // artifact exists, as expected
+ logger.info("RepositoryAudit: {} : exists", artifact.toString());
+ } else {
+ // Audit ERROR: artifact download failed for some reason
+ logger.error("RepositoryAudit: {}: does not exist", artifact.toString());
+ if (!ignoreErrors) {
+ response.append("Failed to download artifact: ")
+ .append(artifact).append('\n');
+ setResponse(response.toString());
+ }
}
}
}
- /*
- * 6) Use 'curl' to delete the uploaded test file
- * (only if repository information is specified)
- */
- if (upload) {
+ private void deleteUploadedTestFile() throws IOException, InterruptedException {
if (runProcess(timeoutInSeconds, dir.toFile(), null,
"curl",
"--request", "DELETE",
@@ -370,11 +396,41 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
artifacts.add(new Artifact(groupId, artifactId, version, "txt"));
}
}
+ }
- /*
- * 7) Remove the temporary directory
- */
- Files.walkFileTree(dir, new RecursivelyDeleteDirectory());
+ private void generateDownloadLogs(File output) throws IOException {
+ // place output in 'fileContents' (replacing the Return characters
+ // with Newline)
+ byte[] outputData = new byte[(int)output.length()];
+ String fileContents;
+ try (FileInputStream fis = new FileInputStream(output)) {
+ //
+ // Ideally this should be in a loop or even better use
+ // Java 8 nio functionality.
+ //
+ int bytesRead = fis.read(outputData);
+ logger.info("fileContents read {} bytes", bytesRead);
+ fileContents = new String(outputData).replace('\r','\n');
+ }
+
+ // generate log messages from 'Downloading' and 'Downloaded'
+ // messages within the 'mvn' output
+ int index = 0;
+ while ((index = fileContents.indexOf("\nDown", index)) > 0) {
+ index += 5;
+ if (fileContents.regionMatches(index, "loading: ", 0, 9)) {
+ index += 9;
+ int endIndex = fileContents.indexOf('\n', index);
+ logger.info("RepositoryAudit: Attempted download: '{}'",
+ fileContents.substring(index, endIndex));
+ index = endIndex;
+ } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) {
+ index += 8;
+ int endIndex = fileContents.indexOf(' ', index);
+ logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex));
+ index = endIndex;
+ }
+ }
}
/**
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
index 584b3845..bd97ddb1 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
@@ -448,24 +448,30 @@ public class PolicyContainer implements Startable {
*/
public synchronized void startScanner(ReleaseId releaseId) {
String version = releaseId.getVersion();
- if (!scannerStarted && scanner == null && version != null
- && ("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) {
- // create the scanner, and poll at 60 second intervals
- try {
- scannerStarted = true;
-
- // start this in a separate thread -- it can block for a long time
- new Thread("Scanner Starter " + getName()) {
- @Override
- public void run() {
- scanner = kieServices.newKieScanner(kieContainer);
- scanner.start(60000L);
- }
- }.start();
- } catch (Exception e) {
- // sometimes the scanner initialization fails for some reason
- logger.error("startScanner error", e);
- }
+
+ if (scannerStarted || scanner != null || version == null) {
+ return;
+ }
+
+ if (!("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) {
+ return;
+ }
+
+ // create the scanner, and poll at 60 second intervals
+ try {
+ scannerStarted = true;
+
+ // start this in a separate thread -- it can block for a long time
+ new Thread("Scanner Starter " + getName()) {
+ @Override
+ public void run() {
+ scanner = kieServices.newKieScanner(kieContainer);
+ scanner.start(60000L);
+ }
+ }.start();
+ } catch (Exception e) {
+ // sometimes the scanner initialization fails for some reason
+ logger.error("startScanner error", e);
}
}
diff --git a/policy-management/pom.xml b/policy-management/pom.xml
index 857e62cf..60175f6c 100644
--- a/policy-management/pom.xml
+++ b/policy-management/pom.xml
@@ -330,5 +330,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
index 733a492d..a4c546f8 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
@@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
List<? extends Topic> topicEntities) {
- String propertyTopicEntityPrefix;
-
List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
if (topicEntities == null || topicEntities.isEmpty()) {
@@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
for (Topic topic : topicEntities) {
- /* source or sink ? ueb or dmaap? */
- boolean isSource = topic instanceof TopicSource;
- CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
- if (commInfra == CommInfrastructure.UEB) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.DMAAP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.NOOP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
- }
- } else {
- throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
- }
-
// 1. first the topic
String firstTopic = topic.getTopic();
+ String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
+
// 2. check if there is a custom decoder for this topic that the user prefers to use
// instead of the ones provided in the platform
- String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
-
- CustomGsonCoder customGsonCoder = null;
- if (customGson != null && !customGson.isEmpty()) {
- try {
- customGsonCoder = new CustomGsonCoder(customGson);
- } catch (IllegalArgumentException e) {
- logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
- e.getMessage(), e);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
// 3. second the list of classes associated with each topic
String eventClasses = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+ .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
if (eventClasses == null || eventClasses.isEmpty()) {
logger.warn("There are no event classes for topic {}", firstTopic);
continue;
}
- List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
+ List<PotentialCoderFilter> classes2Filters =
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
- List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
+ TopicCoderFilterConfiguration topic2Classes2Filters =
+ new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
+ topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ }
- for (String theClass : topicClasses) {
+ return topics2DecodedClasses2Filters;
+ }
+ private String getPropertyTopicPrefix(Topic topic) {
+ boolean isSource = topic instanceof TopicSource;
+ CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
+ if (commInfra == CommInfrastructure.UEB) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.DMAAP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.NOOP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
+ }
+ }
+
+ private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
+ String customGson = properties.getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+
+ CustomGsonCoder customGsonCoder = null;
+ if (customGson != null && !customGson.isEmpty()) {
+ try {
+ customGsonCoder = new CustomGsonCoder(customGson);
+ } catch (IllegalArgumentException e) {
+ logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
+ e.getMessage(), e);
+ }
+ }
+ return customGsonCoder;
+ }
- // 4. third, for each coder class, get the filter expression
+ private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
+ String eventClasses) {
- String filter = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
- + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+ List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
- JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
- PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
- classes2Filters.add(class2Filters);
- }
+ List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
- TopicCoderFilterConfiguration topic2Classes2Filters =
- new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
- topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ for (String theClass : topicClasses) {
+
+ // 4. for each coder class, get the filter expression
+
+ String filter = properties
+ .getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+
+ JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
+ PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
+ classes2Filters.add(class2Filters);
}
- return topics2DecodedClasses2Filters;
+ return classes2Filters;
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
index ca1f2283..77bfcf9f 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
@@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
@@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController {
logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
- if (newGroupId == null || newGroupId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven group-id coordinate");
- }
+ validateText(newGroupId, "Missing maven group-id coordinate");
+ validateText(newArtifactId, "Missing maven artifact-id coordinate");
+ validateText(newVersion, "Missing maven version coordinate");
- if (newArtifactId == null || newArtifactId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven artifact-id coordinate");
- }
-
- if (newVersion == null || newVersion.isEmpty()) {
- throw new IllegalArgumentException("Missing maven version coordinate");
- }
-
- if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
- || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
- || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
- throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion);
- }
+ validateHasBrain(newGroupId, newArtifactId, newVersion);
if (newGroupId.equalsIgnoreCase(this.getGroupId())
&& newArtifactId.equalsIgnoreCase(this.getArtifactId())
@@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController {
return;
}
- if (!newGroupId.equalsIgnoreCase(this.getGroupId())
- || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
- throw new IllegalArgumentException(
- "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion + " vs. " + this);
- }
+ validateNewVersion(newGroupId, newArtifactId, newVersion);
/* upgrade */
String messages = this.policyContainer.updateToVersion(newVersion);
@@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController {
logger.info("UPDATE-TO-VERSION: completed {}", this);
}
+ private void validateText(String text, String errorMessage) {
+ if (text == null || text.isEmpty()) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) {
+ if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
+ || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
+ || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
+ throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion);
+ }
+ }
+
+ private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) {
+ if (!newGroupId.equalsIgnoreCase(this.getGroupId())
+ || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
+ throw new IllegalArgumentException(
+ "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion + " vs. " + this);
+ }
+ }
+
/**
* initialize decoders for all the topics supported by this controller
* Note this is critical to be done after the Policy Container is
@@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController {
for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
String topic = coderConfig.getTopic();
- CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
- if (customGsonCoder != null
- && customGsonCoder.getClassContainer() != null
- && !customGsonCoder.getClassContainer().isEmpty()) {
-
- String customGsonCoderClass = customGsonCoder.getClassContainer();
- if (!isClass(customGsonCoderClass)) {
- throw makeRetrieveEx(customGsonCoderClass);
- } else {
- logClassFetched(customGsonCoderClass);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig);
List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
if (coderFilters == null || coderFilters.isEmpty()) {
@@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController {
}
}
+ private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) {
+ CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
+ if (customGsonCoder != null
+ && customGsonCoder.getClassContainer() != null
+ && !customGsonCoder.getClassContainer().isEmpty()) {
+
+ String customGsonCoderClass = customGsonCoder.getClassContainer();
+ if (!isClass(customGsonCoderClass)) {
+ throw makeRetrieveEx(customGsonCoderClass);
+ } else {
+ logClassFetched(customGsonCoderClass);
+ }
+ }
+ return customGsonCoder;
+ }
+
/**
* Logs an error and makes an exception for an item that cannot be retrieved.
* @param itemName the item to retrieve
@@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController {
// Broadcast
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.beforeInsert(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.beforeInsert(this, event),
+ (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean successInject = this.policyContainer.insertAll(event);
@@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController {
logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
}
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.afterInsert(this, event, successInject)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.afterInsert(this, event, successInject),
+ (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return true;
@@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController {
PolicySession session = getSession(sessionName);
KieSession kieSession = session.getKieSession();
- boolean found = false;
- for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
- for (Query q : kiePackage.getQueries()) {
- if (q.getName() != null && q.getName().equals(queryName)) {
- found = true;
- break;
- }
- }
- }
- if (!found) {
- throw new IllegalArgumentException("Invalid Query Name: " + queryName);
- }
+ validateQueryName(kieSession, queryName);
List<Object> factObjects = new ArrayList<>();
@@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController {
return factObjects;
}
+ private void validateQueryName(KieSession kieSession, String queryName) {
+ for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
+ for (Query q : kiePackage.getQueries()) {
+ if (q.getName() != null && q.getName().equals(queryName)) {
+ return;
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Invalid Query Name: " + queryName);
+ }
+
@Override
public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
String factClassName = fact.getClass().getName();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
index 89a7a420..cb4ce07e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
@@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder {
coders.put(key, coderTools);
- if (reverseCoders.containsKey(reverseKey)) {
- // There is another controller (different group id/artifact id/topic)
- // that shares the class and the topic.
-
- List<ProtocolCoderToolset> toolsets =
- reverseCoders.get(reverseKey);
- boolean present = false;
- for (ProtocolCoderToolset parserSet : toolsets) {
- // just doublecheck
- present = parserSet.getControllerId().equals(key);
- if (present) {
- /* anomaly */
- logger.error(
- "{}: unexpected toolset reverse mapping found for {}:{}: {}",
- this,
- reverseKey,
- key,
- parserSet);
- }
- }
+ addReverseCoder(coderTools, key, reverseKey);
+ }
+ }
+ private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) {
+ if (reverseCoders.containsKey(reverseKey)) {
+ // There is another controller (different group id/artifact id/topic)
+ // that shares the class and the topic.
+
+ List<ProtocolCoderToolset> toolsets =
+ reverseCoders.get(reverseKey);
+ boolean present = false;
+ for (ProtocolCoderToolset parserSet : toolsets) {
+ // just doublecheck
+ present = parserSet.getControllerId().equals(key);
if (present) {
- return;
- } else {
- logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
- toolsets.add(coderTools);
+ /* anomaly */
+ logger.error(
+ "{}: unexpected toolset reverse mapping found for {}:{}: {}",
+ this,
+ reverseKey,
+ key,
+ parserSet);
}
+ }
+
+ if (present) {
+ return;
} else {
- List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
toolsets.add(coderTools);
-
- logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
- reverseCoders.put(reverseKey, toolsets);
}
+ } else {
+ List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ toolsets.add(coderTools);
+
+ logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
+ reverseCoders.put(reverseKey, toolsets);
}
}
@@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder {
for (CoderFilters codeFilter : coderToolset.getCoders()) {
String className = codeFilter.getCodedClass();
String reverseKey = this.reverseCodersKey(topic, className);
- if (this.reverseCoders.containsKey(reverseKey)) {
- List<ProtocolCoderToolset> toolsets =
- this.reverseCoders.get(reverseKey);
- Iterator<ProtocolCoderToolset> toolsetsIter =
- toolsets.iterator();
- while (toolsetsIter.hasNext()) {
- ProtocolCoderToolset toolset = toolsetsIter.next();
- if (toolset.getControllerId().equals(key)) {
- logger.info(
- "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
- toolsetsIter.remove();
- }
- }
-
- if (this.reverseCoders.get(reverseKey).isEmpty()) {
- logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
- this.reverseCoders.remove(reverseKey);
- }
- }
+ removeReverseCoder(key, reverseKey);
}
}
}
}
+ private void removeReverseCoder(String key, String reverseKey) {
+ if (!this.reverseCoders.containsKey(reverseKey)) {
+ return;
+ }
+
+ List<ProtocolCoderToolset> toolsets =
+ this.reverseCoders.get(reverseKey);
+ Iterator<ProtocolCoderToolset> toolsetsIter =
+ toolsets.iterator();
+ while (toolsetsIter.hasNext()) {
+ ProtocolCoderToolset toolset = toolsetsIter.next();
+ if (toolset.getControllerId().equals(key)) {
+ logger.info(
+ "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
+ toolsetsIter.remove();
+ }
+ }
+
+ if (this.reverseCoders.get(reverseKey).isEmpty()) {
+ logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
+ this.reverseCoders.remove(reverseKey);
+ }
+ }
+
/**
* does it support coding.
*
@@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder {
}
for (ProtocolCoderToolset encoderSet : toolsets) {
- // figure out the right toolset
- String groupId = encoderSet.getGroupId();
- String artifactId = encoderSet.getArtifactId();
- List<CoderFilters> coderFilters = encoderSet.getCoders();
- for (CoderFilters coder : coderFilters) {
- if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
- DroolsController droolsController =
- DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
- if (droolsController.ownsCoder(
- encodedClass.getClass(), coder.getModelClassLoaderHash())) {
- droolsControllers.add(droolsController);
- }
- }
- }
+ addToolsetControllers(droolsControllers, encodedClass, encoderSet);
}
if (droolsControllers.isEmpty()) {
@@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder {
return droolsControllers;
}
+ private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass,
+ ProtocolCoderToolset encoderSet) {
+ // figure out the right toolset
+ String groupId = encoderSet.getGroupId();
+ String artifactId = encoderSet.getArtifactId();
+ List<CoderFilters> coderFilters = encoderSet.getCoders();
+ for (CoderFilters coder : coderFilters) {
+ if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
+ DroolsController droolsController =
+ DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
+ if (droolsController.ownsCoder(
+ encodedClass.getClass(), coder.getModelClassLoaderHash())) {
+ droolsControllers.add(droolsController);
+ }
+ }
+ }
+ }
+
/**
* get all filters by maven coordinates and topic.
*
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
index 17247f41..82cd015e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
@@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable {
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSource> getTopicSources();
+ List<TopicSource> getTopicSources();
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSink> getTopicSinks();
+ List<TopicSink> getTopicSinks();
/**
* Get the Drools Controller.
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 36d8ca59..32e3f674 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
+import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
@@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Is the Policy Engine running.
*/
+ @Getter
private volatile boolean alive = false;
/**
* Is the engine locked.
*/
+ @Getter
private volatile boolean locked = false;
/**
@@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Policy Engine Sources.
*/
- private List<? extends TopicSource> sources = new ArrayList<>();
+ @Getter
+ private List<TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks.
*/
- private List<? extends TopicSink> sinks = new ArrayList<>();
+ @Getter
+ private List<TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers.
*/
+ @Getter
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
@@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public synchronized void boot(String[] cliArgs) {
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeBoot(this, cliArgs)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeBoot(this, cliArgs),
+ (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
try {
@@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterBoot(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterBoot(this),
+ (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch pre configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeConfigure(this, properties)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-configure failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeConfigure(this, properties),
+ (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.properties = properties;
@@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterConfigure(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterConfigure(this),
+ (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine {
}
// feature hook
- for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
- try {
- if (controllerFeature.afterCreate(controller)) {
- return controller;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-controller-create failure because of {}", this,
- controllerFeature.getClass().getName(), e.getMessage(), e);
- }
- }
+ PolicyController controller2 = controller;
+ FeatureApiUtils.apply(getControllerProviders(),
+ feature -> feature.afterCreate(controller2),
+ (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
return controller;
}
@@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("No controller configuration has been provided");
}
- PolicyController policyController = null;
try {
final String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
@@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("operation must be provided");
}
- try {
- policyController = getControllerFactory().get(controllerName);
- } catch (final IllegalArgumentException e) {
- // not found
- logger.warn("Policy Controller " + controllerName + " not found", e);
- }
-
+ PolicyController policyController = getController(controllerName);
if (policyController == null) {
-
- if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
- || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
- throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
- }
-
- /* Recovery case */
-
- logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
-
- final Properties controllerProperties =
- getPersistenceManager().getControllerProperties(controllerName);
-
- /*
- * returned properties cannot be null (per implementation) assert (properties !=
- * null)
- */
-
- if (controllerProperties == null) {
- throw new IllegalArgumentException(controllerName + " is invalid");
- }
-
- logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
- controllerName);
-
- /*
- * try to bring up bad controller in brainless mode, after having it
- * working, apply the new create/update operation.
- */
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
- DroolsControllerConstants.NO_GROUP_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
- DroolsControllerConstants.NO_ARTIFACT_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
- DroolsControllerConstants.NO_VERSION);
-
- policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ policyController = findController(controllerName, operation);
/* fall through to do brain update operation */
}
- switch (operation) {
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
- policyController.unlock();
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
- policyController.lock();
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
- policyController.unlock();
- break;
- default:
- final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
- + controllerName;
- logger.warn(msg);
- throw new IllegalArgumentException(msg);
- }
+ updateController(controllerName, policyController, operation, configController);
return policyController;
} catch (final Exception e) {
@@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine {
}
}
+ private PolicyController getController(final String controllerName) {
+ PolicyController policyController = null;
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+ return policyController;
+ }
+
+ private PolicyController findController(final String controllerName, final String operation) {
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it
+ * working, apply the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
+ DroolsControllerConstants.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
+ DroolsControllerConstants.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
+ DroolsControllerConstants.NO_VERSION);
+
+ return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ }
+
+ private void updateController(final String controllerName, PolicyController policyController,
+ final String operation, ControllerConfiguration configController) {
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
@Override
public synchronized boolean start() {
/* policy-engine dispatch pre start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
- boolean success = true;
if (this.locked) {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
this.alive = true;
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.waitedStart(10 * 1000L)) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ httpServer -> httpServer.waitedStart(10 * 1000L),
+ (item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine exclusively-owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Controllers */
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
- e);
- success = false;
- }
- }
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::start,
+ (item, ex) -> {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Start managed Topic Endpoints */
try {
if (!getTopicEndpointManager().start()) {
- success = false;
+ success.set(false);
}
} catch (final IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
@@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine {
startPdpJmxListener();
/* policy-engine dispatch after start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStart(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
+ }
+
+ @FunctionalInterface
+ private static interface PredicateWithEx<T> {
+ public boolean test(T value) throws InterruptedException;
}
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless of the lock state */
- boolean success = true;
if (!this.alive) {
return true;
}
this.alive = false;
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
- success = false;
- }
- }
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::stop,
+ (item, ex) -> {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Stop Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Stop Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* stop all managed topics sources and sinks */
if (!getTopicEndpointManager().stop()) {
- success = false;
+ success.set(false);
}
/* stop all unmanaged http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ HttpServletServer::stop,
+ (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
// stop JMX?
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStop(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
}
@Override
@@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine {
exitThread.start();
/* policy-engine dispatch pre shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- source.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ applyAll(this.sources,
+ TopicSource::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- sink.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ applyAll(this.sinks,
+ TopicSink::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown managed resources */
getControllerFactory().shutdown();
@@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine {
stopPdpJmxListener();
/* policy-engine dispatch post shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
try {
- if (feature.afterShutdown(this)) {
- return;
+ if (!pred.test(item)) {
+ success.set(false);
}
- } catch (final Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
+
+ } catch (InterruptedException ex) {
+ handleEx.accept(item, ex);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
}
}
+ }
- exitThread.interrupt();
- logger.info("{}: normal termination", this);
+ private <T> void applyAll(List<T> items, Consumer<T> function,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
+ try {
+ function.accept(item);
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
+ }
+ }
}
/**
@@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine {
/*
* shut down the Policy Engine owned http servers as the very last thing
*/
- for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
- try {
- httpServer.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
- httpServer, e.getMessage(), e);
- }
- }
+ applyAll(PolicyEngineManager.this.getHttpServers(),
+ HttpServletServer::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
logger.info("{}: exit", PolicyEngineManager.this);
doExit(0);
@@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.locked) {
@@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
/* policy-engine dispatch post lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterLock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (!this.locked) {
@@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().unlock() && success;
/* policy-engine dispatch after unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public void removePolicyController(String name) {
getControllerFactory().destroy(name);
}
@@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine {
return this.properties;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSource> getSources() {
- return (List<TopicSource>) this.sources;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSink> getSinks() {
- return (List<TopicSink>) this.sinks;
- }
-
- @Override
- public List<HttpServletServer> getHttpServers() {
- return this.httpServers;
- }
-
@Override
public List<String> getFeatures() {
final List<String> features = new ArrayList<>();
@@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* policy-engine pre topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
+ (feature, ex) -> logger.error(
+ "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex))) {
+ return;
}
/* configuration request */
@@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine after topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
- }
+ PdpdConfiguration configuration2 = configuration;
+ FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex));
}
@Override
@@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
if (topicSinks == null || topicSinks.size() != 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine {
* Note this entry point is usually from the DRL (one of the reasons busType is String.
*/
- if (busType == null || busType.isEmpty()) {
+ if (StringUtils.isBlank(busType)) {
throw new IllegalArgumentException("Invalid Communication Infrastructure");
}
- if (topic == null || topic.isEmpty()) {
+ if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(INVALID_TOPIC_MSG);
}
@@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException(INVALID_EVENT_MSG);
}
- boolean valid = false;
- for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
- if (comm.name().equals(busType)) {
- valid = true;
- }
- }
+ boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
+ .anyMatch(name -> name.equals(busType));
if (!valid) {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
@@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
// activate 'policy-management'
@@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine {
this.unlock();
/* policy-engine dispatch post activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.lock();
@@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
private boolean controllerConfig(PdpdConfiguration config) {
@@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine {
}
final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
- boolean success = false;
- if (!(policyControllers == null || policyControllers.isEmpty())
- && (policyControllers.size() == configControllers.size())) {
- success = true;
- }
- return success;
+ return (policyControllers != null && !policyControllers.isEmpty()
+ && policyControllers.size() == configControllers.size());
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
index 5d915104..aa57abaf 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
@@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.controller.DroolsControllerFactory;
@@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory;
*/
public class AggregatedPolicyController implements PolicyController, TopicListener {
+ private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
+ private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
+
/**
* Logger.
*/
@@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
/**
* Abstracted Event Sources List regardless communication technology.
*/
- private final List<? extends TopicSource> sources;
+ private final List<TopicSource> sources;
/**
* Abstracted Event Sinks List regardless communication technology.
*/
- private final List<? extends TopicSink> sinks;
+ private final List<TopicSink> sinks;
/**
* Mapping topics to sinks.
@@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean start() {
logger.info("{}: start", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.isLocked()) {
@@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
}
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean stop() {
logger.info("{}: stop", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless locked state */
@@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.stop();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void shutdown() {
logger.info("{}: shutdown", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().shutdown(this.droolsController);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void halt() {
logger.info("{}: halt", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().destroy(this.droolsController);
getPersistenceManager().deleteController(this.name);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, commType, topic, event)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, commType, topic, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
boolean success = this.droolsController.offer(topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, commType, topic, event, success)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, commType, topic, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return true;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean success = this.droolsController.offer(event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeDeliver(this, commType, topic, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeDeliver(this, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (topic == null || topic.isEmpty()) {
@@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterDeliver(this, commType, topic, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterDeliver(this, commType, topic, event, success),
+ (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean lock() {
logger.info("{}: lock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.lock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.info("{}: unlock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.unlock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSource> getTopicSources() {
+ public List<TopicSource> getTopicSources() {
return this.sources;
}
@@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSink> getTopicSinks() {
+ public List<TopicSink> getTopicSinks() {
return this.sinks;
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
index 4c262775..0d8bdfab 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
@@ -22,7 +22,10 @@ package org.onap.policy.drools.controller.internal;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.kie.api.builder.ReleaseId;
@@ -42,13 +45,15 @@ public class MavenDroolsControllerTest {
private static volatile ReleaseId releaseId;
+ private static volatile CountDownLatch running;
+
/**
* Set up.
*
* @throws IOException throws an IO exception
*/
@BeforeClass
- public static void setUp() throws IOException {
+ public static void setUpBeforeClass() throws IOException {
releaseId =
KieUtils.installArtifact(Paths.get(JUNIT_ECHO_KMODULE_PATH).toFile(),
Paths.get(JUNIT_ECHO_KMODULE_POM_PATH).toFile(),
@@ -56,6 +61,15 @@ public class MavenDroolsControllerTest {
Paths.get(JUNIT_ECHO_KMODULE_DRL_PATH).toFile());
}
+ @Before
+ public void setUp() {
+ running = new CountDownLatch(1);
+ }
+
+ public static void setRunning() {
+ running.countDown();
+ }
+
@Test
public void stop() throws InterruptedException {
createDroolsController(10000L).stop();
@@ -106,8 +120,8 @@ public class MavenDroolsControllerTest {
Assert.assertEquals(releaseId.getArtifactId(), controller.getContainer().getArtifactId());
Assert.assertEquals(releaseId.getVersion(), controller.getContainer().getVersion());
- /* courtesy timer to allow full initialization from local maven repository */
- Thread.sleep(courtesyStartTimeMs);
+ /* allow full initialization from local maven repository */
+ Assert.assertTrue(running.await(courtesyStartTimeMs, TimeUnit.MILLISECONDS));
Assert.assertEquals(1, controller.getSessionNames().size());
Assert.assertEquals(JUNIT_ECHO_KSESSION, controller.getSessionNames().get(0));
diff --git a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
index bd595725..7787a7b6 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
@@ -231,7 +231,7 @@ public class ProtocolCoderToolsetTest {
Properties sinkConfig = new Properties();
sinkConfig.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC);
- final List<? extends TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig);
+ final List<TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig);
Properties droolsControllerConfig = new Properties();
droolsControllerConfig.put(DroolsPropertyConstants.RULES_GROUPID, releaseId.getGroupId());
diff --git a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
index 173c1738..237bd4df 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
@@ -171,13 +171,17 @@ public class RestManagerTest {
* @throws InterruptedException Interrupted exception
*/
@AfterClass
- public static void tearDown() throws IOException, InterruptedException {
+ public static void tearDown() throws IOException {
+ try {
+ client.close();
+ } catch (IOException ex) {
+ logger.warn("cannot close HTTP client connection", ex);
+ }
+
/* Shutdown managed resources */
PolicyControllerConstants.getFactory().shutdown();
TopicEndpointManager.getManager().shutdown();
PolicyEngineConstants.getManager().stop();
- Thread.sleep(10000L);
- client.close();
cleanUpWorkingDirs();
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
index df1f6cca..997fc03e 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
@@ -20,6 +20,7 @@
package org.onap.policy.drools.system;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -29,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
@@ -302,7 +304,6 @@ public class PolicyEngineTest {
TopicEndpointManager.getManager().shutdown();
PolicyEngineConstants.getManager().stop();
- Thread.sleep(10000L);
- assertFalse(PolicyEngineConstants.getManager().isAlive());
+ await().atMost(10, TimeUnit.SECONDS).until(() -> !PolicyEngineConstants.getManager().isAlive());
}
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
index 6f09ab9b..695893d4 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
@@ -924,7 +924,7 @@ public class AggregatedPolicyControllerTest {
// remaining methods should not have been invoked
assertThatThrownBy(() -> verifyBefore.accept(prov2)).isInstanceOf(AssertionError.class);
- assertThatThrownBy(() -> verifyMiddle.run()).isInstanceOf(AssertionError.class);
+ assertThatThrownBy(verifyMiddle::run).isInstanceOf(AssertionError.class);
assertThatThrownBy(() -> verifyAfter.accept(prov1)).isInstanceOf(AssertionError.class);
assertThatThrownBy(() -> verifyAfter.accept(prov2)).isInstanceOf(AssertionError.class);
diff --git a/policy-management/src/test/resources/echo.drl b/policy-management/src/test/resources/echo.drl
index 664df639..bd26f95b 100644
--- a/policy-management/src/test/resources/echo.drl
+++ b/policy-management/src/test/resources/echo.drl
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,10 +20,13 @@
package org.onap.policy.drools.test;
+import org.onap.policy.drools.controller.internal.MavenDroolsControllerTest;
+
rule "INIT"
lock-on-active
when
then
+ MavenDroolsControllerTest.setRunning();
insert(new String("hello,I am up"));
end
diff --git a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
index f10343e2..ac90ab96 100644
--- a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
+++ b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
@@ -55,6 +55,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -221,111 +222,45 @@ class MdcTransactionImpl implements MdcTransaction {
*/
@Override
public MdcTransaction flush() {
- if (this.requestId != null && !this.requestId.isEmpty()) {
- MDC.put(REQUEST_ID, this.requestId);
- }
-
- if (this.invocationId != null && !this.invocationId.isEmpty()) {
- MDC.put(INVOCATION_ID, this.invocationId);
- }
-
- if (this.partner != null) {
- MDC.put(PARTNER_NAME, this.partner);
- }
-
- if (this.virtualServerName != null) {
- MDC.put(VIRTUAL_SERVER_NAME, this.virtualServerName);
- }
-
- if (this.serverName != null) {
- MDC.put(SERVER, this.serverName);
- }
-
- if (this.serverIpAddress != null) {
- MDC.put(SERVER_IP_ADDRESS, this.serverIpAddress);
- }
-
- if (this.serverFqdn != null) {
- MDC.put(SERVER_FQDN, this.serverFqdn);
- }
-
- if (this.serviceName != null) {
- MDC.put(SERVICE_NAME, this.serviceName);
- }
-
- if (this.startTime != null) {
- MDC.put(BEGIN_TIMESTAMP, timestamp(this.startTime));
- }
-
- if (this.endTime != null) {
- MDC.put(END_TIMESTAMP, timestamp(this.endTime));
- } else {
- this.setEndTime(null);
- MDC.put(END_TIMESTAMP, timestamp(this.endTime));
- }
+ setMdc(REQUEST_ID, this.requestId);
+ setMdc(INVOCATION_ID, this.invocationId);
+ setMdc(PARTNER_NAME, this.partner);
+ setMdc(VIRTUAL_SERVER_NAME, this.virtualServerName);
+ setMdc(SERVER, this.serverName);
+ setMdc(SERVER_IP_ADDRESS, this.serverIpAddress);
+ setMdc(SERVER_FQDN, this.serverFqdn);
+ setMdc(SERVICE_NAME, this.serviceName);
+ setMdc(BEGIN_TIMESTAMP, timestamp(this.startTime));
+ setMdc(END_TIMESTAMP, timestamp(this.endTime));
if (this.elapsedTime != null) {
MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
- } else {
- if (endTime != null && startTime != null) {
- this.elapsedTime = Duration.between(startTime, endTime).toMillis();
- MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
- }
- }
-
- if (this.serviceInstanceId != null) {
- MDC.put(SERVICE_INSTANCE_ID, this.serviceInstanceId);
- }
-
- if (this.instanceUuid != null) {
- MDC.put(INSTANCE_UUID, this.instanceUuid);
- }
-
- if (this.processKey != null) {
- MDC.put(PROCESS_KEY, this.processKey);
- }
-
- if (this.statusCode != null) {
- MDC.put(STATUS_CODE, this.statusCode);
- }
-
- if (this.responseCode != null) {
- MDC.put(RESPONSE_CODE, this.responseCode);
- }
-
- if (this.responseDescription != null) {
- MDC.put(RESPONSE_DESCRIPTION, this.responseDescription);
- }
-
- if (this.theSeverity != null) {
- MDC.put(SEVERITY, this.theSeverity);
- }
-
- if (this.alertSeverity != null) {
- MDC.put(ALERT_SEVERITY, this.alertSeverity);
- }
-
- if (this.targetEntity != null) {
- MDC.put(TARGET_ENTITY, this.targetEntity);
- }
-
- if (this.targetServiceName != null) {
- MDC.put(TARGET_SERVICE_NAME, this.targetServiceName);
+ } else if (endTime != null && startTime != null) {
+ this.elapsedTime = Duration.between(startTime, endTime).toMillis();
+ MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
}
- if (this.targetVirtualEntity != null) {
- MDC.put(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity);
- }
+ setMdc(SERVICE_INSTANCE_ID, this.serviceInstanceId);
+ setMdc(INSTANCE_UUID, this.instanceUuid);
+ setMdc(PROCESS_KEY, this.processKey);
+ setMdc(STATUS_CODE, this.statusCode);
+ setMdc(RESPONSE_CODE, this.responseCode);
+ setMdc(RESPONSE_DESCRIPTION, this.responseDescription);
+ setMdc(SEVERITY, this.theSeverity);
+ setMdc(ALERT_SEVERITY, this.alertSeverity);
+ setMdc(TARGET_ENTITY, this.targetEntity);
+ setMdc(TARGET_SERVICE_NAME, this.targetServiceName);
+ setMdc(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity);
+ setMdc(CLIENT_IP_ADDRESS, this.clientIpAddress);
+ setMdc(REMOTE_HOST, this.remoteHost);
- if (this.clientIpAddress != null) {
- MDC.put(CLIENT_IP_ADDRESS, this.clientIpAddress);
- }
+ return this;
+ }
- if (this.remoteHost != null) {
- MDC.put(REMOTE_HOST, this.remoteHost);
+ private void setMdc(String paramName, String value) {
+ if (!StringUtils.isBlank(value)) {
+ MDC.put(paramName, value);
}
-
- return this;
}
@Override
@@ -689,6 +624,10 @@ class MdcTransactionImpl implements MdcTransaction {
@Override
public String timestamp(Instant time) {
+ if (time == null) {
+ return null;
+ }
+
return new SimpleDateFormat(DATE_FORMAT).format(Date.from(time));
}