summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-08-14 17:31:50 -0400
committerJim Hahn <jrh3@att.com>2019-08-15 11:23:31 -0400
commit59e9b9a8b56d563814ef21a23716959f772f9194 (patch)
treef152aea1578a82737501f56916ca07d8e7889d18
parenta156cf3cbad6512510ae9a02a13c0408f901c734 (diff)
Fix more sonar issues in drools-pdp
Addressed issues of cyclomatic complexity and deep nesting by refactoring code into separate methods. In some cases, had to refactor the code into nested classes to avoid passing too many parameters to the newly extracted methods. Addressed issue "too many conditionals" by breaking conditionals apart. Addressed issue "Remove usage of generic wildcard type" by eliminating "? extends" from return values. Addressed issue "Remove this use of 'Thread.sleep()'" in junit tests by introducing latches or using Awaitility. Note: this won't build until ApiUtils has been merged. Change-Id: I0d5596b4cb918a36bc22f426f426bd238195b458 Issue-ID: POLICY-1968 Signed-off-by: Jim Hahn <jrh3@att.com>
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java1026
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java45
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java259
-rw-r--r--feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java7
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java14
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java488
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java42
-rw-r--r--policy-management/pom.xml7
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java127
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java139
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java137
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java4
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java701
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java264
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java20
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java2
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java10
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java5
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java2
-rw-r--r--policy-management/src/test/resources/echo.drl5
-rw-r--r--policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java133
21 files changed, 1698 insertions, 1739 deletions
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
index 33c8a8d8..85cf88b3 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java
@@ -202,255 +202,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
logger.debug("DesignatedWaiter.run: pdps.size= {}", pdps.size());
//This is only true if all designated PDPs have failed
- boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps);
- logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed);
- for (DroolsPdp pdp : pdps) {
- logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId());
-
- /*
- * Note: side effect of isPdpCurrent is that any stale but
- * designated PDPs will be marked as un-designated.
- */
- boolean isCurrent = pdpsConnector.isPdpCurrent(pdp);
-
- /*
- * We can't use stateManagement.getStandbyStatus() here, because
- * we need the standbyStatus, not for this PDP, but for the PDP
- * being processed by this loop iteration.
- */
- String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- if (standbyStatus == null) {
- // Treat this case as a cold standby -- if we
- // abort here, no sessions will be created in a
- // single-node test environment.
- standbyStatus = StateManagement.COLD_STANDBY;
- }
- logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent);
-
- /*
- * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn
- * and evaluate the each pdp in the list of pdps against each combination.
- *
- * This is the first combination of isDesignated and isCurrent
- */
- if (pdp.isDesignated() && isCurrent) {
- //It is current, but it could have a standbystatus=coldstandby / hotstandby
- //If so, we need to stand it down and demote it
- if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
- + "butstandbystatus is not providingservice. "
- + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId());
- // So, we must demote it
- try {
- //Keep the order like this. StateManagement is last since it
- //triggers controller shutdown
- //This will change isDesignated and it can enter another if(combination) below
- pdpsConnector.standDownPdp(pdp.getPdpId());
- myPdp.setDesignated(false);
- isDesignated = false;
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only demote it if it appears it has not already been demoted. Don't worry
- * about synching with the topic endpoint states. That is done by the
- * refreshStateAudit
- */
- stateManagementFeature.demote();
- }
- //update the standbystatus to check in a later
- //combination of isDesignated and isCurrent
- standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} "
- + "Caught Exception attempting to demote myPdp,"
- + "message= {}", myPdp.getPdpId(), e);
- }
- } else {
- // Don't demote a remote PDP that is current. It should catch itself
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
- + "but standbystatus is not providingservice. "
- + " Cannot execute stateManagement.demote() "
- + "since it it is not myPdp\n\n",
- myPdp.getPdpId());
- }
-
- } else {
- // If we get here, it is ok to be on the list
- logger.debug("DesignatedWaiter.run: PDP= {} is designated, "
- + "current and {} Noting PDP as "
- + "designated, standbyStatus= {}",
- pdp.getPdpId(), standbyStatus, standbyStatus);
- listOfDesignated.add(pdp);
- }
-
-
- }
-
-
- /*
- * The second combination of isDesignated and isCurrent
- *
- * PDP is designated but not current; it has failed.
- * So we stand it down (it doesn't matter what
- * its standbyStatus is). None of these go on the list.
- */
- if (pdp.isDesignated() && !isCurrent) {
- logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently "
- + "designated but is not current; "
- + "it has failed. Standing down. standbyStatus= {}",
- pdp.getPdpId(), standbyStatus);
- /*
- * Changes designated to 0 but it is still potentially providing service
- * Will affect isDesignated, so, it can enter an if(combination) below
- */
- pdpsConnector.standDownPdp(pdp.getPdpId());
-
- //need to change standbystatus to coldstandby
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. "
- + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId());
- // We found that myPdp is designated but not current
- // So, we must cause it to disableFail
- try {
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- stateManagementFeature.disableFailed();
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
- + "attempting to disableFail myPdp {}, message= {}",
- myPdp.getPdpId(), myPdp.getPdpId(), e);
- }
- } else { //it is a remote PDP that is failed
- logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. "
- + " Executing stateManagement.disableFailed(otherResourceName)\n\n",
- pdp.getPdpId() );
- // We found a PDP is designated but not current
- // We already called standdown(pdp) which will change designated to false
- // Now we need to disableFail it to get its states in synch. The standbyStatus
- // should equal coldstandby
- try {
- stateManagementFeature.disableFailed(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to "
- + "disableFail({}), message= {}",
- pdp.getPdpId(), pdp.getPdpId(), e);
- }
-
- }
- continue; //we are not going to do anything else with this pdp
- }
-
- /*
- * The third combination of isDesignated and isCurrent
- * /*
- * If a PDP is not currently designated but is providing service
- * (erroneous, but recoverable) or hot standby
- * we can add it to the list of possible designated if all the designated have failed
- */
- if (!pdp.isDesignated() && isCurrent) {
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- logger.debug("\n\nDesignatedWaiter.run: PDP {}"
- + " is NOT designated but IS current and"
- + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus);
- // Since it is current, we assume it can adjust its own state.
- // We will demote if it is myPdp
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- //demote it
- logger.debug("DesignatedWaiter.run: PDP {} going to "
- + "setDesignated = false and calling stateManagement.demote",
- pdp.getPdpId());
- try {
- //Keep the order like this.
- //StateManagement is last since it triggers controller shutdown
- pdpsConnector.setDesignated(myPdp, false);
- myPdp.setDesignated(false);
- isDesignated = false;
- //This is definitely not a redundant call.
- //It is attempting to correct a problem
- stateManagementFeature.demote();
- //recheck the standbystatus
- standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
- + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(),
- myPdp.getPdpId(), e);
- }
-
- }
- }
- if (standbyStatus.equals(StateManagement.HOT_STANDBY) && designatedPdpHasFailed) {
- //add it to the list
- logger.debug("INFO: DesignatedWaiter.run: PDP= {}"
- + " is not designated but is {} and designated PDP "
- + "has failed. standbyStatus= {}", pdp.getPdpId(),
- standbyStatus, standbyStatus);
- listOfDesignated.add(pdp);
- }
- continue; //done with this one
- }
-
- /*
- * The fourth combination of isDesignated and isCurrent
- *
- * We are not going to put any of these on the list since it appears they have failed.
-
- *
- */
- if (!pdp.isDesignated() && !isCurrent) {
- logger.debug("INFO: DesignatedWaiter.run: PDP= {} "
- + "designated= {}, current= {}, "
- + "designatedPdpHasFailed= {}, "
- + "standbyStatus= {}",pdp.getPdpId(),
- pdp.isDesignated(), isCurrent, designatedPdpHasFailed, standbyStatus);
- if (!standbyStatus.equals(StateManagement.COLD_STANDBY)) {
- //stand it down
- //disableFail it
- pdpsConnector.standDownPdp(pdp.getPdpId());
- if (pdp.getPdpId().equals(myPdp.getPdpId())) {
- /*
- * I don't actually know how this condition could
- * happen, but if it did, we would want to declare it
- * failed.
- */
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
- + " Executing stateManagement.disableFailed()\n\n",
- myPdp.getPdpId());
- // So, we must disableFail it
- try {
- //Keep the order like this.
- //StateManagement is last since it triggers controller shutdown
- pdpsConnector.setDesignated(myPdp, false);
- myPdp.setDesignated(false);
- isDesignated = false;
- stateManagementFeature.disableFailed();
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
- + "disableFail myPdp {}, message= {}",
- myPdp.getPdpId(), myPdp.getPdpId(), e);
- }
- } else { //it is remote
- logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
- + " Executing stateManagement.disableFailed({})\n\n",
- myPdp.getPdpId(), pdp.getPdpId());
- // We already called standdown(pdp) which will change designated to false
- // Now we need to disableFail it to get its states in sync.
- // StandbyStatus = coldstandby
- try {
- stateManagementFeature.disableFailed(pdp.getPdpId());
- } catch (Exception e) {
- logger.error("DesignatedWaiter.run: for PDP {}"
- + " Caught Exception attempting to disableFail({})"
- + ", message=", pdp.getPdpId(), pdp.getPdpId(), e);
- }
- }
- }
- }
-
-
- } // end pdps loop
+ allPdpsFailed(pdps, listOfDesignated);
/*
* We have checked the four combinations of isDesignated and isCurrent. Where appropriate,
@@ -479,7 +231,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
/*
- * It is possible to get here with more than one pdp designated and providingservice. This normally
+ * It is possible to get here with more than one pdp designated and providing service. This normally
* occurs when there is a race condition with multiple nodes coming up at the same time. If that is
* the case we must determine which one is the one that should be designated and which one should
* be demoted.
@@ -490,82 +242,22 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* the previously designated pdp.
*/
DroolsPdp designatedPdp = computeDesignatedPdp(listOfDesignated, mostRecentPrimary);
- if (designatedPdp != null) {
- pdpdNowActive = designatedPdp.getPdpId();
- }
if (designatedPdp == null) {
logger.warn("WARNING: DesignatedWaiter.run: No viable PDP found to be Designated. "
+ "designatedPdp still null.");
- // Just to be sure the parameters are correctly set
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp,false);
- isDesignated = false;
-
- waitTimerLastRunDate = new Date();
- logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}",
- waitTimerLastRunDate);
- myPdp.setUpdatedDate(waitTimerLastRunDate);
- pdpsConnector.update(myPdp);
-
+ designateNoPdp();
return;
+ }
- } else if (designatedPdp.getPdpId().equals(myPdp.getPdpId())) {
- logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId());
- /*
- * update function expects myPdp.isDesignated to be true.
- */
- try {
- //Keep the order like this. StateManagement is last since it triggers controller init
- myPdp.setDesignated(true);
- myPdp.setDesignatedDate(new Date());
- pdpsConnector.setDesignated(myPdp, true);
- isDesignated = true;
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- /*
- * Only call promote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- * Note that we need to fetch the session list from 'mostRecentPrimary'
- * at this point -- soon, 'mostRecentPrimary' will be set to this host.
- */
- //this.sessions = mostRecentPrimary.getSessions();
- stateManagementFeature.promote();
- }
- } catch (Exception e) {
- logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}"
- + ", message=", myPdp.getPdpId(), e);
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp,false);
- isDesignated = false;
- //If you can't promote it, demote it
- try {
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only call demote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- */
- stateManagementFeature.demote();
- }
- } catch (Exception e1) {
- logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException "
- + "attempting to promote then demote PDP={}, message=",
- myPdp.getPdpId(), e1);
- }
-
- }
- waitTimerLastRunDate = new Date();
- logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) "
- + "waitTimerLastRunDate = " + waitTimerLastRunDate);
- myPdp.setUpdatedDate(waitTimerLastRunDate);
- pdpsConnector.update(myPdp);
+ pdpdNowActive = designatedPdp.getPdpId();
+ if (pdpdNowActive.equals(myPdp.getPdpId())) {
+ logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId());
+ designateMyPdp();
return;
}
+
isDesignated = false;
} // end synchronized
@@ -583,6 +275,350 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
logger.error("DesignatedWaiter.run caught an unexpected exception: ", e);
}
} // end run
+
+ private void allPdpsFailed(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) {
+ boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps);
+ logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed);
+ for (DroolsPdp pdp : pdps) {
+ logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId());
+
+ /*
+ * Note: side effect of isPdpCurrent is that any stale but
+ * designated PDPs will be marked as un-designated.
+ */
+ boolean isCurrent = pdpsConnector.isPdpCurrent(pdp);
+
+ /*
+ * We can't use stateManagement.getStandbyStatus() here, because
+ * we need the standbyStatus, not for this PDP, but for the PDP
+ * being processed by this loop iteration.
+ */
+ String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
+ if (standbyStatus == null) {
+ // Treat this case as a cold standby -- if we
+ // abort here, no sessions will be created in a
+ // single-node test environment.
+ standbyStatus = StateManagement.COLD_STANDBY;
+ }
+ logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent);
+
+ adjustPdp(pdp, isCurrent, designatedPdpHasFailed, standbyStatus, listOfDesignated);
+
+
+ } // end pdps loop
+ }
+
+ private void adjustPdp(DroolsPdp pdp, boolean isCurrent, boolean designatedPdpHasFailed, String standbyStatus,
+ List<DroolsPdp> listOfDesignated) {
+ /*
+ * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn
+ * and evaluate the each pdp in the list of pdps against each combination.
+ */
+ if (pdp.isDesignated()) {
+ /*
+ * This is the first combination of isDesignated and isCurrent
+ */
+ if (isCurrent) {
+ pdpDesignatedCurrent(pdp, standbyStatus, listOfDesignated);
+
+ /*
+ * The second combination of isDesignated and isCurrent
+ *
+ * PDP is designated but not current; it has failed.
+ * So we stand it down (it doesn't matter what
+ * its standbyStatus is). None of these go on the list.
+ */
+ } else {
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently "
+ + "designated but is not current; "
+ + "it has failed. Standing down. standbyStatus= {}",
+ pdp.getPdpId(), standbyStatus);
+ pdpDesignatedNotCurrent(pdp);
+ }
+
+ } else {
+ // NOT designated
+
+
+ /*
+ * The third combination of isDesignated and isCurrent
+ * /*
+ * If a PDP is not currently designated but is providing service
+ * (erroneous, but recoverable) or hot standby
+ * we can add it to the list of possible designated if all the designated have failed
+ */
+ if (isCurrent) {
+ pdpNotDesignatedCurrent(pdp, designatedPdpHasFailed, standbyStatus,
+ listOfDesignated);
+
+ /*
+ * The fourth combination of isDesignated and isCurrent
+ *
+ * We are not going to put any of these on the list since it appears they have failed.
+ *
+ */
+ } else {
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {} "
+ + "designated= {}, current= {}, "
+ + "designatedPdpHasFailed= {}, "
+ + "standbyStatus= {}",pdp.getPdpId(),
+ pdp.isDesignated(), false, designatedPdpHasFailed, standbyStatus);
+ pdpNotDesignatedNotCurrent(pdp, standbyStatus);
+ }
+ }
+ }
+
+ private void pdpDesignatedCurrent(DroolsPdp pdp, String standbyStatus, List<DroolsPdp> listOfDesignated) {
+ //It is current, but it could have a standbystatus=coldstandby / hotstandby
+ //If so, we need to stand it down and demote it
+ if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
+ + "butstandbystatus is not providingservice. "
+ + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId());
+ // So, we must demote it
+ try {
+ demoteMyPdp(pdp, standbyStatus);
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} "
+ + "Caught Exception attempting to demote myPdp,"
+ + "message= {}", myPdp.getPdpId(), e);
+ }
+ } else {
+ // Don't demote a remote PDP that is current. It should catch itself
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, "
+ + "but standbystatus is not providingservice. "
+ + " Cannot execute stateManagement.demote() "
+ + "since it it is not myPdp\n\n",
+ myPdp.getPdpId());
+ }
+
+ } else {
+ // If we get here, it is ok to be on the list
+ logger.debug("DesignatedWaiter.run: PDP= {} is designated, "
+ + "current and {} Noting PDP as "
+ + "designated, standbyStatus= {}",
+ pdp.getPdpId(), standbyStatus, standbyStatus);
+ listOfDesignated.add(pdp);
+ }
+ }
+
+ private void demoteMyPdp(DroolsPdp pdp, String standbyStatus) throws Exception {
+ //Keep the order like this. StateManagement is last since it
+ //triggers controller shutdown
+ //This will change isDesignated and it can enter another if(combination) below
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only demote it if it appears it has not already been demoted. Don't worry
+ * about synching with the topic endpoint states. That is done by the
+ * refreshStateAudit
+ */
+ stateManagementFeature.demote();
+ }
+ }
+
+ private void pdpDesignatedNotCurrent(DroolsPdp pdp) {
+ /*
+ * Changes designated to 0 but it is still potentially providing service
+ * Will affect isDesignated, so, it can enter an if(combination) below
+ */
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+
+ //need to change standbystatus to coldstandby
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. "
+ + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId());
+ // We found that myPdp is designated but not current
+ // So, we must cause it to disableFail
+ try {
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ stateManagementFeature.disableFailed();
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
+ + "attempting to disableFail myPdp {}, message= {}",
+ myPdp.getPdpId(), myPdp.getPdpId(), e);
+ }
+ } else { //it is a remote PDP that is failed
+ logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. "
+ + " Executing stateManagement.disableFailed(otherResourceName)\n\n",
+ pdp.getPdpId() );
+ // We found a PDP is designated but not current
+ // We already called standdown(pdp) which will change designated to false
+ // Now we need to disableFail it to get its states in synch. The standbyStatus
+ // should equal coldstandby
+ try {
+ stateManagementFeature.disableFailed(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to "
+ + "disableFail({}), message= {}",
+ pdp.getPdpId(), pdp.getPdpId(), e);
+ }
+
+ }
+ }
+
+ private void pdpNotDesignatedCurrent(DroolsPdp pdp, boolean designatedPdpHasFailed, String standbyStatus,
+ List<DroolsPdp> listOfDesignated) {
+ if (!(StateManagement.HOT_STANDBY.equals(standbyStatus)
+ || StateManagement.COLD_STANDBY.equals(standbyStatus))) {
+ logger.debug("\n\nDesignatedWaiter.run: PDP {}"
+ + " is NOT designated but IS current and"
+ + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus);
+ // Since it is current, we assume it can adjust its own state.
+ // We will demote if it is myPdp
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ //demote it
+ logger.debug("DesignatedWaiter.run: PDP {} going to "
+ + "setDesignated = false and calling stateManagement.demote",
+ pdp.getPdpId());
+ try {
+ //Keep the order like this.
+ //StateManagement is last since it triggers controller shutdown
+ pdpsConnector.setDesignated(myPdp, false);
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ //This is definitely not a redundant call.
+ //It is attempting to correct a problem
+ stateManagementFeature.demote();
+ //recheck the standbystatus
+ standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception "
+ + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(),
+ myPdp.getPdpId(), e);
+ }
+
+ }
+ }
+ if (StateManagement.HOT_STANDBY.equals(standbyStatus) && designatedPdpHasFailed) {
+ //add it to the list
+ logger.debug("INFO: DesignatedWaiter.run: PDP= {}"
+ + " is not designated but is {} and designated PDP "
+ + "has failed. standbyStatus= {}", pdp.getPdpId(),
+ standbyStatus, standbyStatus);
+ listOfDesignated.add(pdp);
+ }
+ }
+
+ private void pdpNotDesignatedNotCurrent(DroolsPdp pdp, String standbyStatus) {
+ if (StateManagement.COLD_STANDBY.equals(standbyStatus)) {
+ return;
+ }
+
+ //stand it down
+ //disableFail it
+ pdpsConnector.standDownPdp(pdp.getPdpId());
+ if (pdp.getPdpId().equals(myPdp.getPdpId())) {
+ /*
+ * I don't actually know how this condition could
+ * happen, but if it did, we would want to declare it
+ * failed.
+ */
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
+ + " Executing stateManagement.disableFailed()\n\n",
+ myPdp.getPdpId());
+ // So, we must disableFail it
+ try {
+ //Keep the order like this.
+ //StateManagement is last since it triggers controller shutdown
+ pdpsConnector.setDesignated(myPdp, false);
+ myPdp.setDesignated(false);
+ isDesignated = false;
+ stateManagementFeature.disableFailed();
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
+ + "disableFail myPdp {}, message= {}",
+ myPdp.getPdpId(), myPdp.getPdpId(), e);
+ }
+ } else { //it is remote
+ logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, "
+ + " Executing stateManagement.disableFailed({})\n\n",
+ myPdp.getPdpId(), pdp.getPdpId());
+ // We already called standdown(pdp) which will change designated to false
+ // Now we need to disableFail it to get its states in sync.
+ // StandbyStatus = coldstandby
+ try {
+ stateManagementFeature.disableFailed(pdp.getPdpId());
+ } catch (Exception e) {
+ logger.error("DesignatedWaiter.run: for PDP {}"
+ + " Caught Exception attempting to disableFail({})"
+ + ", message=", pdp.getPdpId(), pdp.getPdpId(), e);
+ }
+ }
+ }
+
+ private void designateNoPdp() {
+ // Just to be sure the parameters are correctly set
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp,false);
+ isDesignated = false;
+
+ waitTimerLastRunDate = new Date();
+ logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}",
+ waitTimerLastRunDate);
+ myPdp.setUpdatedDate(waitTimerLastRunDate);
+ pdpsConnector.update(myPdp);
+ }
+
+ private void designateMyPdp() {
+ /*
+ * update function expects myPdp.isDesignated to be true.
+ */
+ try {
+ //Keep the order like this. StateManagement is last since it triggers controller init
+ myPdp.setDesignated(true);
+ myPdp.setDesignatedDate(new Date());
+ pdpsConnector.setDesignated(myPdp, true);
+ isDesignated = true;
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ /*
+ * Only call promote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ * Note that we need to fetch the session list from 'mostRecentPrimary'
+ * at this point -- soon, 'mostRecentPrimary' will be set to this host.
+ */
+ //this.sessions = mostRecentPrimary.getSessions();
+ stateManagementFeature.promote();
+ }
+ } catch (Exception e) {
+ logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}"
+ + ", message=", myPdp.getPdpId(), e);
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp,false);
+ isDesignated = false;
+ //If you can't promote it, demote it
+ try {
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only call demote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ */
+ stateManagementFeature.demote();
+ }
+ } catch (Exception e1) {
+ logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException "
+ + "attempting to promote then demote PDP={}, message=",
+ myPdp.getPdpId(), e1);
+ }
+
+ }
+ waitTimerLastRunDate = new Date();
+ logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) "
+ + "waitTimerLastRunDate = " + waitTimerLastRunDate);
+ myPdp.setUpdatedDate(waitTimerLastRunDate);
+ pdpsConnector.update(myPdp);
+ }
}
/**
@@ -621,53 +657,27 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* @return drools pdp object
*/
public DroolsPdp computeMostRecentPrimary(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) {
- boolean containsDesignated = false;
- for (DroolsPdp pdp : listOfDesignated) {
- if (pdp.isDesignated()) {
- containsDesignated = true;
- }
- }
+ boolean containsDesignated = listOfDesignated.stream().anyMatch(DroolsPdp::isDesignated);
+
DroolsPdp mostRecentPrimary = new DroolsPdpImpl(null, true, 1, new Date(0));
mostRecentPrimary.setSite(null);
logger.debug("DesignatedWaiter.run listOfDesignated.size() = {}", listOfDesignated.size());
+
if (listOfDesignated.size() <= 1) {
logger.debug("DesignatedWainter.run: listOfDesignated.size <=1");
//Only one or none is designated or hot standby. Choose the latest designated date
- for (DroolsPdp pdp : pdps) {
- logger.debug("DesignatedWaiter.run pdp = {}"
- + " pdp.getDesignatedDate() = {}",
- pdp.getPdpId(), pdp.getDesignatedDate());
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
+ mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary);
+
} else if (listOfDesignated.size() == pdps.size()) {
logger.debug("DesignatedWainter.run: listOfDesignated.size = pdps.size() which is {}", pdps.size());
//They are all designated or all hot standby.
- mostRecentPrimary = null;
- for (DroolsPdp pdp : pdps) {
- if (mostRecentPrimary == null) {
- mostRecentPrimary = pdp;
- continue;
- }
- if (containsDesignated) { //Choose the site of the first designated date
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- } else { //Choose the site with the latest designated date
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
- }
+ mostRecentPrimary = getBestDesignated(pdps, containsDesignated);
+
} else {
logger.debug("DesignatedWainter.run: Some but not all are designated or hot standby. ");
+ logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
//Some but not all are designated or hot standby.
if (containsDesignated) {
- logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
/*
* The list only contains designated. This is a problem. It is most likely a race
* condition that resulted in two thinking they should be designated. Choose the
@@ -675,29 +685,66 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* This should be the site that had the last designation before this race condition
* occurred.
*/
- for (DroolsPdp pdp : pdps) {
- if (listOfDesignated.contains(pdp)) {
- continue; //Don't consider this entry
- }
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
- }
+ mostRecentPrimary = getLatestUndesignated(pdps, mostRecentPrimary, listOfDesignated);
+
} else {
- logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated);
//The list only contains hot standby. Choose the site of the latest designated date
- for (DroolsPdp pdp : pdps) {
- if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
- mostRecentPrimary = pdp;
- logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
- }
+ mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary);
+ }
+ }
+ return mostRecentPrimary;
+ }
+
+ private DroolsPdp getBestDesignated(Collection<DroolsPdp> pdps, boolean containsDesignated) {
+ DroolsPdp mostRecentPrimary;
+ mostRecentPrimary = null;
+ for (DroolsPdp pdp : pdps) {
+ if (mostRecentPrimary == null) {
+ mostRecentPrimary = pdp;
+ continue;
+ }
+ if (containsDesignated) { //Choose the site of the first designated date
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ } else { //Choose the site with the latest designated date
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
}
}
}
return mostRecentPrimary;
}
+ private DroolsPdp getLatestUndesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary,
+ List<DroolsPdp> listOfDesignated) {
+ for (DroolsPdp pdp : pdps) {
+ if (listOfDesignated.contains(pdp)) {
+ continue; //Don't consider this entry
+ }
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ }
+ return mostRecentPrimary;
+ }
+
+ private DroolsPdp getLatestDesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary) {
+ for (DroolsPdp pdp : pdps) {
+ logger.debug("DesignatedWaiter.run pdp = {}"
+ + " pdp.getDesignatedDate() = {}",
+ pdp.getPdpId(), pdp.getDesignatedDate());
+ if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) {
+ mostRecentPrimary = pdp;
+ logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId());
+ }
+ }
+ return mostRecentPrimary;
+ }
+
/**
* Compue designated pdp.
*
@@ -706,125 +753,148 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker {
* @return drools pdp object
*/
public DroolsPdp computeDesignatedPdp(List<DroolsPdp> listOfDesignated, DroolsPdp mostRecentPrimary) {
- DroolsPdp designatedPdp = null;
- DroolsPdp lowestPriorityPdp = null;
- if (listOfDesignated.size() > 1) {
- logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(),
- listOfDesignated.size());
- DroolsPdp rejectedPdp = null;
- DroolsPdp lowestPrioritySameSite = null;
- DroolsPdp lowestPriorityDifferentSite = null;
- for (DroolsPdp pdp : listOfDesignated) {
- // We need to determine if another PDP is the lowest priority
- if (nullSafeEquals(pdp.getSite(),mostRecentPrimary.getSite())) {
- if (lowestPrioritySameSite == null) {
- if (lowestPriorityDifferentSite != null) {
- rejectedPdp = lowestPriorityDifferentSite;
- }
- lowestPrioritySameSite = pdp;
- } else {
- if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) {
- continue;//nothing to compare
- }
- if (pdp.comparePriority(lowestPrioritySameSite) < 0) {
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(),
- lowestPrioritySameSite.getPdpId());
- //we need to reject lowestPrioritySameSite
- rejectedPdp = lowestPrioritySameSite;
- lowestPrioritySameSite = pdp;
- } else {
- //we need to reject pdp and keep lowestPrioritySameSite
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} "
- + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(),
- lowestPrioritySameSite.getPdpId());
- rejectedPdp = pdp;
- }
- }
- } else {
- if (lowestPrioritySameSite != null) {
- //if we already have a candidate for same site, we don't want to bother with different sites
- rejectedPdp = pdp;
- } else {
- if (lowestPriorityDifferentSite == null) {
- lowestPriorityDifferentSite = pdp;
- continue;
- }
- if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) {
- continue;//nothing to compare
- }
- if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) {
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
- lowestPriorityDifferentSite.getPdpId());
- //we need to reject lowestPriorityDifferentSite
- rejectedPdp = lowestPriorityDifferentSite;
- lowestPriorityDifferentSite = pdp;
- } else {
- //we need to reject pdp and keep lowestPriorityDifferentSite
- logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
- + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
- lowestPriorityDifferentSite.getPdpId());
- rejectedPdp = pdp;
- }
- }
+ if (listOfDesignated.isEmpty()) {
+ logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId());
+ return null;
+ }
+
+ if (listOfDesignated.size() == 1) {
+ logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
+ + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId());
+ return listOfDesignated.get(0);
+ }
+
+ logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(),
+ listOfDesignated.size());
+ DesignatedData data = new DesignatedData();
+ for (DroolsPdp pdp : listOfDesignated) {
+ DroolsPdp rejectedPdp;
+
+ // We need to determine if another PDP is the lowest priority
+ if (nullSafeEquals(pdp.getSite(), mostRecentPrimary.getSite())) {
+ rejectedPdp = data.compareSameSite(pdp);
+ } else {
+ rejectedPdp = data.compareDifferentSite(pdp);
+ }
+ // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible
+ // for demoting itself
+ if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) {
+ logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}"
+ + " is NOT the lowest priority. Executing stateManagement.demote()\n\n",
+ myPdp.getPdpId(),
+ myPdp.getPdpId());
+ // We found that myPdp is on the listOfDesignated and it is not the lowest priority
+ // So, we must demote it
+ demoteMyPdp();
+ }
+ } //end: for(DroolsPdp pdp : listOfDesignated)
+
+ DroolsPdp lowestPriorityPdp = data.getLowestPriority();
+
+ //now we have a valid value for lowestPriorityPdp
+ logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
+ + "found the LOWEST priority pdp ID: {} "
+ + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n",
+ myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp);
+ return lowestPriorityPdp;
+
+ }
+
+ private class DesignatedData {
+ private DroolsPdp lowestPrioritySameSite = null;
+ private DroolsPdp lowestPriorityDifferentSite = null;
+
+ private DroolsPdp compareSameSite(DroolsPdp pdp) {
+ if (lowestPrioritySameSite == null) {
+ if (lowestPriorityDifferentSite != null) {
+ //we need to reject lowestPriorityDifferentSite
+ DroolsPdp rejectedPdp = lowestPriorityDifferentSite;
+ lowestPriorityDifferentSite = pdp;
+ return rejectedPdp;
}
- // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible
- // for demoting itself
- if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) {
- logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}"
- + " is NOT the lowest priority. Executing stateManagement.demote()\n\n",
- myPdp.getPdpId(),
- myPdp.getPdpId());
- // We found that myPdp is on the listOfDesignated and it is not the lowest priority
- // So, we must demote it
- try {
- //Keep the order like this. StateManagement is last since it triggers controller shutdown
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- String standbyStatus = stateManagementFeature.getStandbyStatus();
- if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
- || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
- /*
- * Only call demote if it is not already in the right state. Don't worry about
- * synching the lower level topic endpoint states. That is done by the
- * refreshStateAudit.
- */
- stateManagementFeature.demote();
- }
- } catch (Exception e) {
- myPdp.setDesignated(false);
- pdpsConnector.setDesignated(myPdp, false);
- isDesignated = false;
- logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
- + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(),
- e);
- }
+ lowestPrioritySameSite = pdp;
+ return null;
+ } else {
+ if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) {
+ return null;//nothing to compare
}
- } //end: for(DroolsPdp pdp : listOfDesignated)
+ if (pdp.comparePriority(lowestPrioritySameSite) < 0) {
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPrioritySameSite.getPdpId());
+ //we need to reject lowestPrioritySameSite
+ DroolsPdp rejectedPdp = lowestPrioritySameSite;
+ lowestPrioritySameSite = pdp;
+ return rejectedPdp;
+ } else {
+ //we need to reject pdp and keep lowestPrioritySameSite
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} "
+ + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(),
+ lowestPrioritySameSite.getPdpId());
+ return pdp;
+ }
+ }
+ }
+
+ private DroolsPdp compareDifferentSite(DroolsPdp pdp) {
if (lowestPrioritySameSite != null) {
- lowestPriorityPdp = lowestPrioritySameSite;
+ //if we already have a candidate for same site, we don't want to bother with different sites
+ return pdp;
} else {
- lowestPriorityPdp = lowestPriorityDifferentSite;
+ if (lowestPriorityDifferentSite == null) {
+ lowestPriorityDifferentSite = pdp;
+ return null;
+ }
+ if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) {
+ return null;//nothing to compare
+ }
+ if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) {
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPriorityDifferentSite.getPdpId());
+ //we need to reject lowestPriorityDifferentSite
+ DroolsPdp rejectedPdp = lowestPriorityDifferentSite;
+ lowestPriorityDifferentSite = pdp;
+ return rejectedPdp;
+ } else {
+ //we need to reject pdp and keep lowestPriorityDifferentSite
+ logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}"
+ + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(),
+ lowestPriorityDifferentSite.getPdpId());
+ return pdp;
+ }
}
- //now we have a valid value for lowestPriorityPdp
- logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
- + "found the LOWEST priority pdp ID: {} "
- + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n",
- myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp);
- designatedPdp = lowestPriorityPdp;
-
- } else if (listOfDesignated.isEmpty()) {
- logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId());
- designatedPdp = null;
- } else { //only one in listOfDesignated
- logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated "
- + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId());
- designatedPdp = listOfDesignated.get(0);
}
- return designatedPdp;
+ private DroolsPdp getLowestPriority() {
+ return (lowestPrioritySameSite != null ? lowestPrioritySameSite : lowestPriorityDifferentSite);
+ }
+ }
+
+ private void demoteMyPdp() {
+ try {
+ //Keep the order like this. StateManagement is last since it triggers controller shutdown
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ String standbyStatus = stateManagementFeature.getStandbyStatus();
+ if (!(standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY))) {
+ /*
+ * Only call demote if it is not already in the right state. Don't worry about
+ * synching the lower level topic endpoint states. That is done by the
+ * refreshStateAudit.
+ */
+ stateManagementFeature.demote();
+ }
+ } catch (Exception e) {
+ myPdp.setDesignated(false);
+ pdpsConnector.setDesignated(myPdp, false);
+ isDesignated = false;
+ logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to "
+ + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(),
+ e);
+ }
}
private class TimerUpdateClass extends TimerTask {
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
index 078c891f..ed53f55c 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java
@@ -61,18 +61,19 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
.setFlushMode(FlushModeType.COMMIT).getResultList();
LinkedList<DroolsPdp> droolsPdpsReturnList = new LinkedList<>();
for (Object o : droolsPdpsList) {
- if (o instanceof DroolsPdp) {
- //Make sure it is not a cached version
- em.refresh((DroolsPdpEntity)o);
- droolsPdpsReturnList.add((DroolsPdp)o);
- if (logger.isDebugEnabled()) {
- DroolsPdp droolsPdp = (DroolsPdp)o;
- logger.debug("getDroolsPdps: PDP= {}"
- + ", isDesignated= {}"
- + ", updatedDate= {}"
- + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(),
- droolsPdp.getUpdatedDate(), droolsPdp.getPriority());
- }
+ if (!(o instanceof DroolsPdp)) {
+ continue;
+ }
+ //Make sure it is not a cached version
+ DroolsPdp droolsPdp = (DroolsPdp)o;
+ em.refresh(droolsPdp);
+ droolsPdpsReturnList.add(droolsPdp);
+ if (logger.isDebugEnabled()) {
+ logger.debug("getDroolsPdps: PDP= {}"
+ + ", isDesignated= {}"
+ + ", updatedDate= {}"
+ + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(),
+ droolsPdp.getUpdatedDate(), droolsPdp.getPriority());
}
}
try {
@@ -233,14 +234,7 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
+ " found, designated= {}"
+ ", setting to {}", pdp.getPdpId(), droolsPdpEntity.isDesignated(),
designated);
- droolsPdpEntity.setDesignated(designated);
- if (designated) {
- em.refresh(droolsPdpEntity); //make sure we get the DB value
- if (!droolsPdpEntity.isDesignated()) {
- droolsPdpEntity.setDesignatedDate(new Date());
- }
-
- }
+ setPdpDesignation(em, droolsPdpEntity, designated);
em.getTransaction().commit();
} else {
logger.error("setDesignated: PDP={}"
@@ -256,6 +250,17 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector {
}
+ private void setPdpDesignation(EntityManager em, DroolsPdpEntity droolsPdpEntity, boolean designated) {
+ droolsPdpEntity.setDesignated(designated);
+ if (designated) {
+ em.refresh(droolsPdpEntity); //make sure we get the DB value
+ if (!droolsPdpEntity.isDesignated()) {
+ droolsPdpEntity.setDesignatedDate(new Date());
+ }
+
+ }
+ }
+
@Override
public void standDownPdp(String pdpId) {
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
index 3f4ae557..7669cc22 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java
@@ -125,138 +125,163 @@ public class PmStandbyStateChangeNotifier extends StateChangeNotifier {
if (standbyStatus == null || standbyStatus.equals(StateManagement.NULL_VALUE)) {
logger.debug("handleStateChange: standbyStatus is null; standing down PDP={}", pdpId);
- if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: "
- + "Is returning because standbyStatus is null and was previously 'null'; PDP={}",
- pdpId);
- return;
- }
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
- cancelTimer();
- // Only want to lock the endpoints, not the controllers.
- getPolicyEngineManager().deactivate();
- // The operation was fully successful, but you cannot assign it a real null value
- // because later we might try to execute previousStandbyStatus.equals() and get
- // a null pointer exception.
- previousStandbyStatus = StateManagement.NULL_VALUE;
- } catch (Exception e) {
- logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
- }
+ standDownPdpNull(pdpId);
+
} else if (standbyStatus.equals(StateManagement.HOT_STANDBY)
|| standbyStatus.equals(StateManagement.COLD_STANDBY)) {
logger.debug("handleStateChange: standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
- return;
- }
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
- cancelTimer();
- // Only want to lock the endpoints, not the controllers.
- getPolicyEngineManager().deactivate();
- // The operation was fully successful
- previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
- } catch (Exception e) {
- logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(),
- e);
- }
+ standDownPdp(pdpId, standbyStatus);
} else if (standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
logger.debug("handleStateChange: standbyStatus= {} scheduling activation of PDP={}", standbyStatus,
pdpId);
- if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
- return;
- }
- try {
- // UnLock all the endpoints
- logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus);
- /*
- * Only endpoints should be unlocked. Controllers have not been locked. Because,
- * sometimes, it is possible for more than one PDP-D to become active (race
- * conditions) we need to delay the activation of the topic endpoint interfaces to
- * give the election algorithm time to resolve the conflict.
- */
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}",
- isWaitingForActivation);
-
- // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler
- // conflict.
- // You could have multiple election handlers thinking they can take over.
-
- // First let's check that the timer has not died
- if (isWaitingForActivation) {
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}",
- isWaitingForActivation);
- long now = new Date().getTime();
- long waitTimeMs = now - startTimeWaitingForActivationMs;
- if (waitTimeMs > 3 * waitInterval) {
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer "
- + "may be hung, waitTimeMs = {} and allowable waitInterval = {}"
- + " Checking whether it is currently in activation. isNowActivating = {}",
- waitTimeMs, waitInterval, isNowActivating);
- // Now check that it is not currently executing an activation
- if (!isNowActivating) {
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation "
- + "wait timer died");
- // This will assure the timer is cancelled and rescheduled.
- isWaitingForActivation = false;
- }
- }
+ schedulePdpActivation(pdpId, standbyStatus);
- }
+ } else {
+ logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
+ standDownPdpUnsupported(pdpId, standbyStatus);
+ }
- if (!isWaitingForActivation) {
- // Just in case there is an old timer hanging around
- logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
- cancelTimer();
- delayActivateTimer = makeTimer();
- // delay the activate so the DesignatedWaiter can run twice
- delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
- isWaitingForActivation = true;
- startTimeWaitingForActivationMs = new Date().getTime();
- logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms",
- waitInterval);
- } else {
- logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is "
- + "waiting for activation.");
- }
+ logger.debug("handleStateChange: Exiting");
+ }
- } catch (Exception e) {
- logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ",
- e);
- }
+ private void standDownPdpNull(String pdpId) {
+ if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: "
+ + "Is returning because standbyStatus is null and was previously 'null'; PDP={}",
+ pdpId);
+ return;
+ }
- } else {
- logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) {
- // We were just here and did this successfully
- logger.debug("handleStateChange: Is returning because standbyStatus is "
- + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
- return;
- }
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
+ cancelTimer();
// Only want to lock the endpoints, not the controllers.
- isWaitingForActivation = false;
- try {
- logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
+ getPolicyEngineManager().deactivate();
+ // The operation was fully successful, but you cannot assign it a real null value
+ // because later we might try to execute previousStandbyStatus.equals() and get
+ // a null pointer exception.
+ previousStandbyStatus = StateManagement.NULL_VALUE;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
+ }
+ }
+
+ private void standDownPdp(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
+ return;
+ }
+
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
+ cancelTimer();
+ // Only want to lock the endpoints, not the controllers.
+ getPolicyEngineManager().deactivate();
+ // The operation was fully successful
+ previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(),
+ e);
+ }
+ }
+
+ private void schedulePdpActivation(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
+ return;
+ }
+
+ try {
+ // UnLock all the endpoints
+ logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus);
+ /*
+ * Only endpoints should be unlocked. Controllers have not been locked. Because,
+ * sometimes, it is possible for more than one PDP-D to become active (race
+ * conditions) we need to delay the activation of the topic endpoint interfaces to
+ * give the election algorithm time to resolve the conflict.
+ */
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}",
+ isWaitingForActivation);
+
+ // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler
+ // conflict.
+ // You could have multiple election handlers thinking they can take over.
+
+ // First let's check that the timer has not died
+ checkTimerStatus();
+
+ if (!isWaitingForActivation) {
+ // Just in case there is an old timer hanging around
+ logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
cancelTimer();
- getPolicyEngineManager().deactivate();
- // We know the standbystatus is unsupported
- previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED;
- } catch (Exception e) {
- logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ",
- standbyStatus, e.getMessage(), e);
+ delayActivateTimer = makeTimer();
+ // delay the activate so the DesignatedWaiter can run twice
+ delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
+ isWaitingForActivation = true;
+ startTimeWaitingForActivationMs = new Date().getTime();
+ logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms",
+ waitInterval);
+ } else {
+ logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is "
+ + "waiting for activation.");
}
+
+ } catch (Exception e) {
+ logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ",
+ e);
+ }
+ }
+
+ private void checkTimerStatus() {
+ if (isWaitingForActivation) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}",
+ isWaitingForActivation);
+ long now = new Date().getTime();
+ long waitTimeMs = now - startTimeWaitingForActivationMs;
+ if (waitTimeMs > 3 * waitInterval) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer "
+ + "may be hung, waitTimeMs = {} and allowable waitInterval = {}"
+ + " Checking whether it is currently in activation. isNowActivating = {}",
+ waitTimeMs, waitInterval, isNowActivating);
+ // Now check that it is not currently executing an activation
+ if (!isNowActivating) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation "
+ + "wait timer died");
+ // This will assure the timer is cancelled and rescheduled.
+ isWaitingForActivation = false;
+ }
+ }
+ }
+ }
+
+ private void standDownPdpUnsupported(String pdpId, String standbyStatus) {
+ if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) {
+ // We were just here and did this successfully
+ logger.debug("handleStateChange: Is returning because standbyStatus is "
+ + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
+ return;
+ }
+
+ // Only want to lock the endpoints, not the controllers.
+ isWaitingForActivation = false;
+ try {
+ logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
+ cancelTimer();
+ getPolicyEngineManager().deactivate();
+ // We know the standbystatus is unsupported
+ previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ",
+ standbyStatus, e.getMessage(), e);
}
- logger.debug("handleStateChange: Exiting");
}
private void cancelTimer() {
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
index 8b653aae..dd0fb7b8 100644
--- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
+++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java
@@ -99,7 +99,7 @@ public class DMaaPSimulatorJaxRs {
return response;
}
- private String waitForNextMessageFromQueue(int timeout, String topicName) {
+ protected String waitForNextMessageFromQueue(int timeout, String topicName) {
try {
sleep(timeout);
if (queues.containsKey(topicName)) {
@@ -129,10 +129,7 @@ public class DMaaPSimulatorJaxRs {
@Consumes(MediaType.TEXT_PLAIN)
public String publish(@PathParam("topicName") String topicName, String body) {
BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>());
-
- if (!queue.offer(body)) {
- logger.warn("error on topic {}, failed to place body {} on queue", topicName, body);
- }
+ queue.offer(body);
return "";
}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
index 7200bdce..b1275004 100644
--- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
+++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java
@@ -21,6 +21,7 @@
package org.onap.policy.drools.simulators;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
import org.junit.After;
import org.junit.Before;
@@ -130,6 +132,16 @@ public class DMaaPSimulatorJaxRsTest {
@Test
public void testWaitForNextMessageFromQueue() throws InterruptedException {
+ CountDownLatch waitCalled = new CountDownLatch(1);
+
+ sim = new DMaaPSimulatorJaxRs() {
+ @Override
+ protected String waitForNextMessageFromQueue(int timeout, String topicName) {
+ waitCalled.countDown();
+ return super.waitForNextMessageFromQueue(timeout, topicName);
+ }
+ };
+
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
CountDownLatch latch1 = backgroundSubscribe(queue);
@@ -143,7 +155,7 @@ public class DMaaPSimulatorJaxRsTest {
* Must pause to prevent the topic from being created before subscribe() is
* invoked.
*/
- Thread.sleep(LONG_TIMEOUT_MS / 3);
+ assertTrue(waitCalled.await(1, TimeUnit.SECONDS));
// only publish one message
sim.publish(TOPIC, MESSAGE);
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
index 9d39a7bf..dc0f3784 100644
--- a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
@@ -30,9 +30,9 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,96 +73,172 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
throws IOException, InterruptedException {
logger.debug("Running 'RepositoryAudit.invoke'");
- boolean isActive = true;
- // ignore errors by default
- boolean ignoreErrors = true;
- String repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active");
- String repoAuditIgnoreErrors =
- StateManagementProperties.getProperty("repository.audit.ignore.errors");
+ InvokeData data = new InvokeData();
+
logger.debug("RepositoryAudit.invoke: repoAuditIsActive = {}"
- + ", repoAuditIgnoreErrors = {}",repoAuditIsActive, repoAuditIgnoreErrors);
-
- if (repoAuditIsActive != null) {
- try {
- isActive = Boolean.parseBoolean(repoAuditIsActive.trim());
- } catch (NumberFormatException e) {
- logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}",
- repoAuditIsActive);
- }
- }
+ + ", repoAuditIgnoreErrors = {}",data.repoAuditIsActive, data.repoAuditIgnoreErrors);
- if (!isActive) {
- logger.info("RepositoryAudit.invoke: exiting because isActive = {}", isActive);
+ data.initIsActive();
+
+ if (!data.isActive) {
+ logger.info("RepositoryAudit.invoke: exiting because isActive = {}", data.isActive);
return;
}
- if (repoAuditIgnoreErrors != null) {
- try {
- ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim());
- } catch (NumberFormatException e) {
- ignoreErrors = true;
- logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}",
- repoAuditIgnoreErrors);
- }
- } else {
- ignoreErrors = true;
- }
+ data.initIgnoreErrors();
+ data.initTimeout();
- // Fetch repository information from 'IntegrityMonitorProperties'
- String repositoryId =
- StateManagementProperties.getProperty("repository.audit.id");
- String repositoryUrl =
- StateManagementProperties.getProperty("repository.audit.url");
- String repositoryUsername =
- StateManagementProperties.getProperty("repository.audit.username");
- String repositoryPassword =
- StateManagementProperties.getProperty("repository.audit.password");
- boolean upload =
- repositoryId != null && repositoryUrl != null
- && repositoryUsername != null && repositoryPassword != null;
+ /*
+ * 1) create temporary directory
+ */
+ data.dir = Files.createTempDirectory("auditRepo");
+ logger.info("RepositoryAudit: temporary directory = {}", data.dir);
- // used to incrementally construct response as problems occur
- // (empty = no problems)
- StringBuilder response = new StringBuilder();
-
- long timeoutInSeconds = DEFAULT_TIMEOUT;
- String timeoutString =
- StateManagementProperties.getProperty("repository.audit.timeout");
- if (timeoutString != null && !timeoutString.isEmpty()) {
- try {
- timeoutInSeconds = Long.valueOf(timeoutString);
- } catch (NumberFormatException e) {
- logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'",
- timeoutString, e);
- if (!ignoreErrors) {
- response.append("Invalid 'repository.audit.timeout' value: '")
- .append(timeoutString).append("'\n");
- setResponse(response.toString());
- }
- }
+ // nested 'pom.xml' file and 'repo' directory
+ final Path pom = data.dir.resolve("pom.xml");
+ final Path repo = data.dir.resolve("repo");
+
+ /*
+ * 2) Create test file, and upload to repository
+ * (only if repository information is specified)
+ */
+ if (data.upload) {
+ data.uploadTestFile();
}
- // artifacts to be downloaded
- LinkedList<Artifact> artifacts = new LinkedList<>();
+ /*
+ * 3) create 'pom.xml' file in temporary directory
+ */
+ data.createPomFile(repo, pom);
/*
- * 1) create temporary directory
+ * 4) Invoke external 'mvn' process to do the downloads
*/
- Path dir = Files.createTempDirectory("auditRepo");
- logger.info("RepositoryAudit: temporary directory = {}", dir);
- // nested 'pom.xml' file and 'repo' directory
- final Path pom = dir.resolve("pom.xml");
- final Path repo = dir.resolve("repo");
+ // output file = ${dir}/out (this supports step '4a')
+ File output = data.dir.resolve("out").toFile();
+
+ // invoke process, and wait for response
+ int rval = data.runMaven(output);
/*
- * 2) Create test file, and upload to repository
+ * 4a) Check attempted and successful downloads from output file
+ * Note: at present, this step just generates log messages,
+ * but doesn't do any verification.
+ */
+ if (rval == 0 && output != null) {
+ generateDownloadLogs(output);
+ }
+
+ /*
+ * 5) Check the contents of the directory to make sure the downloads
+ * were successful
+ */
+ data.verifyDownloads(repo);
+
+ /*
+ * 6) Use 'curl' to delete the uploaded test file
* (only if repository information is specified)
*/
- String groupId = null;
- String artifactId = null;
- String version = null;
- if (upload) {
+ if (data.upload) {
+ data.deleteUploadedTestFile();
+ }
+
+ /*
+ * 7) Remove the temporary directory
+ */
+ Files.walkFileTree(data.dir, new RecursivelyDeleteDirectory());
+ }
+
+ private class InvokeData {
+ private boolean isActive = true;
+
+ // ignore errors by default
+ private boolean ignoreErrors = true;
+
+ private final String repoAuditIsActive;
+ private final String repoAuditIgnoreErrors;
+
+ private final String repositoryId;
+ private final String repositoryUrl;
+ private final String repositoryUsername;
+ private final String repositoryPassword;
+ private final boolean upload;
+
+ // used to incrementally construct response as problems occur
+ // (empty = no problems)
+ private final StringBuilder response = new StringBuilder();
+
+ private long timeoutInSeconds = DEFAULT_TIMEOUT;
+
+ private Path dir;
+
+ private String groupId = null;
+ private String artifactId = null;
+ private String version = null;
+
+ // artifacts to be downloaded
+ private final List<Artifact> artifacts = new LinkedList<>();
+
+ public InvokeData() {
+ repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active");
+ repoAuditIgnoreErrors = StateManagementProperties.getProperty("repository.audit.ignore.errors");
+
+ // Fetch repository information from 'IntegrityMonitorProperties'
+ repositoryId = StateManagementProperties.getProperty("repository.audit.id");
+ repositoryUrl = StateManagementProperties.getProperty("repository.audit.url");
+ repositoryUsername = StateManagementProperties.getProperty("repository.audit.username");
+ repositoryPassword = StateManagementProperties.getProperty("repository.audit.password");
+
+ upload = repositoryId != null && repositoryUrl != null
+ && repositoryUsername != null && repositoryPassword != null;
+ }
+
+ public void initIsActive() {
+ if (repoAuditIsActive != null) {
+ try {
+ isActive = Boolean.parseBoolean(repoAuditIsActive.trim());
+ } catch (NumberFormatException e) {
+ logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}",
+ repoAuditIsActive);
+ }
+ }
+ }
+
+ public void initIgnoreErrors() {
+ if (repoAuditIgnoreErrors != null) {
+ try {
+ ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim());
+ } catch (NumberFormatException e) {
+ ignoreErrors = true;
+ logger.warn(
+ "RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}",
+ repoAuditIgnoreErrors);
+ }
+ } else {
+ ignoreErrors = true;
+ }
+ }
+
+ public void initTimeout() {
+ String timeoutString =
+ StateManagementProperties.getProperty("repository.audit.timeout");
+ if (timeoutString != null && !timeoutString.isEmpty()) {
+ try {
+ timeoutInSeconds = Long.valueOf(timeoutString);
+ } catch (NumberFormatException e) {
+ logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'",
+ timeoutString, e);
+ if (!ignoreErrors) {
+ response.append("Invalid 'repository.audit.timeout' value: '")
+ .append(timeoutString).append("'\n");
+ setResponse(response.toString());
+ }
+ }
+ }
+ }
+
+ private void uploadTestFile() throws IOException, InterruptedException {
groupId = "org.onap.policy.audit";
artifactId = "repository-audit";
version = "0." + System.currentTimeMillis();
@@ -204,155 +280,105 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
}
}
- /*
- * 3) create 'pom.xml' file in temporary directory
- */
- artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2"));
-
- StringBuilder sb = new StringBuilder();
- sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
- + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n"
- + "\n"
- + " <modelVersion>4.0.0</modelVersion>\n"
- + " <groupId>empty</groupId>\n"
- + " <artifactId>empty</artifactId>\n"
- + " <version>1.0-SNAPSHOT</version>\n"
- + " <packaging>pom</packaging>\n"
- + "\n"
- + " <build>\n"
- + " <plugins>\n"
- + " <plugin>\n"
- + " <groupId>org.apache.maven.plugins</groupId>\n"
- + " <artifactId>maven-dependency-plugin</artifactId>\n"
- + " <version>2.10</version>\n"
- + " <executions>\n"
- + " <execution>\n"
- + " <id>copy</id>\n"
- + " <goals>\n"
- + " <goal>copy</goal>\n"
- + " </goals>\n"
- + " <configuration>\n"
- + " <localRepositoryDirectory>")
- .append(repo)
- .append("</localRepositoryDirectory>\n")
- .append(" <artifactItems>\n");
-
- for (Artifact artifact : artifacts) {
- // each artifact results in an 'artifactItem' element
- sb.append(" <artifactItem>\n"
- + " <groupId>")
- .append(artifact.groupId)
- .append("</groupId>\n"
- + " <artifactId>")
- .append(artifact.artifactId)
- .append("</artifactId>\n"
- + " <version>")
- .append(artifact.version)
- .append("</version>\n"
- + " <type>")
- .append(artifact.type)
- .append("</type>\n"
- + " </artifactItem>\n");
- }
- sb.append(" </artifactItems>\n"
- + " </configuration>\n"
- + " </execution>\n"
- + " </executions>\n"
- + " </plugin>\n"
- + " </plugins>\n"
- + " </build>\n"
- + "</project>\n");
-
- try (FileOutputStream fos = new FileOutputStream(pom.toFile())) {
- fos.write(sb.toString().getBytes());
- }
-
- /*
- * 4) Invoke external 'mvn' process to do the downloads
- */
-
- // output file = ${dir}/out (this supports step '4a')
- File output = dir.resolve("out").toFile();
-
- // invoke process, and wait for response
- int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile");
- logger.info("RepositoryAudit: 'mvn' return value = {}", rval);
- if (rval != 0) {
- logger.error("RepositoryAudit: 'mvn compile' invocation failed");
- if (!ignoreErrors) {
- response.append("'mvn compile' invocation failed\n");
- setResponse(response.toString());
+ private void createPomFile(final Path repo, final Path pom)
+ throws IOException {
+
+ artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2"));
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n"
+ + "\n"
+ + " <modelVersion>4.0.0</modelVersion>\n"
+ + " <groupId>empty</groupId>\n"
+ + " <artifactId>empty</artifactId>\n"
+ + " <version>1.0-SNAPSHOT</version>\n"
+ + " <packaging>pom</packaging>\n"
+ + "\n"
+ + " <build>\n"
+ + " <plugins>\n"
+ + " <plugin>\n"
+ + " <groupId>org.apache.maven.plugins</groupId>\n"
+ + " <artifactId>maven-dependency-plugin</artifactId>\n"
+ + " <version>2.10</version>\n"
+ + " <executions>\n"
+ + " <execution>\n"
+ + " <id>copy</id>\n"
+ + " <goals>\n"
+ + " <goal>copy</goal>\n"
+ + " </goals>\n"
+ + " <configuration>\n"
+ + " <localRepositoryDirectory>")
+ .append(repo)
+ .append("</localRepositoryDirectory>\n")
+ .append(" <artifactItems>\n");
+
+ for (Artifact artifact : artifacts) {
+ // each artifact results in an 'artifactItem' element
+ sb.append(" <artifactItem>\n"
+ + " <groupId>")
+ .append(artifact.groupId)
+ .append("</groupId>\n"
+ + " <artifactId>")
+ .append(artifact.artifactId)
+ .append("</artifactId>\n"
+ + " <version>")
+ .append(artifact.version)
+ .append("</version>\n"
+ + " <type>")
+ .append(artifact.type)
+ .append("</type>\n"
+ + " </artifactItem>\n");
}
- }
-
- /*
- * 4a) Check attempted and successful downloads from output file
- * Note: at present, this step just generates log messages,
- * but doesn't do any verification.
- */
- if (rval == 0 && output != null) {
- // place output in 'fileContents' (replacing the Return characters
- // with Newline)
- byte[] outputData = new byte[(int)output.length()];
- String fileContents;
- try (FileInputStream fis = new FileInputStream(output)) {
- //
- // Ideally this should be in a loop or even better use
- // Java 8 nio functionality.
- //
- int bytesRead = fis.read(outputData);
- logger.info("fileContents read {} bytes", bytesRead);
- fileContents = new String(outputData).replace('\r','\n');
+ sb.append(" </artifactItems>\n"
+ + " </configuration>\n"
+ + " </execution>\n"
+ + " </executions>\n"
+ + " </plugin>\n"
+ + " </plugins>\n"
+ + " </build>\n"
+ + "</project>\n");
+
+ try (FileOutputStream fos = new FileOutputStream(pom.toFile())) {
+ fos.write(sb.toString().getBytes());
}
+ }
- // generate log messages from 'Downloading' and 'Downloaded'
- // messages within the 'mvn' output
- int index = 0;
- while ((index = fileContents.indexOf("\nDown", index)) > 0) {
- index += 5;
- if (fileContents.regionMatches(index, "loading: ", 0, 9)) {
- index += 9;
- int endIndex = fileContents.indexOf('\n', index);
- logger.info("RepositoryAudit: Attempted download: '{}'",
- fileContents.substring(index, endIndex));
- index = endIndex;
- } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) {
- index += 8;
- int endIndex = fileContents.indexOf(' ', index);
- logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex));
- index = endIndex;
+ private int runMaven(File output) throws IOException, InterruptedException {
+ int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile");
+ logger.info("RepositoryAudit: 'mvn' return value = {}", rval);
+ if (rval != 0) {
+ logger.error("RepositoryAudit: 'mvn compile' invocation failed");
+ if (!ignoreErrors) {
+ response.append("'mvn compile' invocation failed\n");
+ setResponse(response.toString());
}
}
+ return rval;
}
- /*
- * 5) Check the contents of the directory to make sure the downloads
- * were successful
- */
- for (Artifact artifact : artifacts) {
- if (repo.resolve(artifact.groupId.replace('.','/'))
- .resolve(artifact.artifactId)
- .resolve(artifact.version)
- .resolve(artifact.artifactId + "-" + artifact.version + "."
- + artifact.type).toFile().exists()) {
- // artifact exists, as expected
- logger.info("RepositoryAudit: {} : exists", artifact.toString());
- } else {
- // Audit ERROR: artifact download failed for some reason
- logger.error("RepositoryAudit: {}: does not exist", artifact.toString());
- if (!ignoreErrors) {
- response.append("Failed to download artifact: ")
- .append(artifact).append('\n');
- setResponse(response.toString());
+ private void verifyDownloads(final Path repo) {
+ for (Artifact artifact : artifacts) {
+ if (repo.resolve(artifact.groupId.replace('.','/'))
+ .resolve(artifact.artifactId)
+ .resolve(artifact.version)
+ .resolve(artifact.artifactId + "-" + artifact.version + "."
+ + artifact.type).toFile().exists()) {
+ // artifact exists, as expected
+ logger.info("RepositoryAudit: {} : exists", artifact.toString());
+ } else {
+ // Audit ERROR: artifact download failed for some reason
+ logger.error("RepositoryAudit: {}: does not exist", artifact.toString());
+ if (!ignoreErrors) {
+ response.append("Failed to download artifact: ")
+ .append(artifact).append('\n');
+ setResponse(response.toString());
+ }
}
}
}
- /*
- * 6) Use 'curl' to delete the uploaded test file
- * (only if repository information is specified)
- */
- if (upload) {
+ private void deleteUploadedTestFile() throws IOException, InterruptedException {
if (runProcess(timeoutInSeconds, dir.toFile(), null,
"curl",
"--request", "DELETE",
@@ -370,11 +396,41 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase {
artifacts.add(new Artifact(groupId, artifactId, version, "txt"));
}
}
+ }
- /*
- * 7) Remove the temporary directory
- */
- Files.walkFileTree(dir, new RecursivelyDeleteDirectory());
+ private void generateDownloadLogs(File output) throws IOException {
+ // place output in 'fileContents' (replacing the Return characters
+ // with Newline)
+ byte[] outputData = new byte[(int)output.length()];
+ String fileContents;
+ try (FileInputStream fis = new FileInputStream(output)) {
+ //
+ // Ideally this should be in a loop or even better use
+ // Java 8 nio functionality.
+ //
+ int bytesRead = fis.read(outputData);
+ logger.info("fileContents read {} bytes", bytesRead);
+ fileContents = new String(outputData).replace('\r','\n');
+ }
+
+ // generate log messages from 'Downloading' and 'Downloaded'
+ // messages within the 'mvn' output
+ int index = 0;
+ while ((index = fileContents.indexOf("\nDown", index)) > 0) {
+ index += 5;
+ if (fileContents.regionMatches(index, "loading: ", 0, 9)) {
+ index += 9;
+ int endIndex = fileContents.indexOf('\n', index);
+ logger.info("RepositoryAudit: Attempted download: '{}'",
+ fileContents.substring(index, endIndex));
+ index = endIndex;
+ } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) {
+ index += 8;
+ int endIndex = fileContents.indexOf(' ', index);
+ logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex));
+ index = endIndex;
+ }
+ }
}
/**
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
index 584b3845..bd97ddb1 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
@@ -448,24 +448,30 @@ public class PolicyContainer implements Startable {
*/
public synchronized void startScanner(ReleaseId releaseId) {
String version = releaseId.getVersion();
- if (!scannerStarted && scanner == null && version != null
- && ("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) {
- // create the scanner, and poll at 60 second intervals
- try {
- scannerStarted = true;
-
- // start this in a separate thread -- it can block for a long time
- new Thread("Scanner Starter " + getName()) {
- @Override
- public void run() {
- scanner = kieServices.newKieScanner(kieContainer);
- scanner.start(60000L);
- }
- }.start();
- } catch (Exception e) {
- // sometimes the scanner initialization fails for some reason
- logger.error("startScanner error", e);
- }
+
+ if (scannerStarted || scanner != null || version == null) {
+ return;
+ }
+
+ if (!("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) {
+ return;
+ }
+
+ // create the scanner, and poll at 60 second intervals
+ try {
+ scannerStarted = true;
+
+ // start this in a separate thread -- it can block for a long time
+ new Thread("Scanner Starter " + getName()) {
+ @Override
+ public void run() {
+ scanner = kieServices.newKieScanner(kieContainer);
+ scanner.start(60000L);
+ }
+ }.start();
+ } catch (Exception e) {
+ // sometimes the scanner initialization fails for some reason
+ logger.error("startScanner error", e);
}
}
diff --git a/policy-management/pom.xml b/policy-management/pom.xml
index 857e62cf..60175f6c 100644
--- a/policy-management/pom.xml
+++ b/policy-management/pom.xml
@@ -330,5 +330,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>3.0.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
index 733a492d..a4c546f8 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java
@@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
List<? extends Topic> topicEntities) {
- String propertyTopicEntityPrefix;
-
List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
if (topicEntities == null || topicEntities.isEmpty()) {
@@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory {
for (Topic topic : topicEntities) {
- /* source or sink ? ueb or dmaap? */
- boolean isSource = topic instanceof TopicSource;
- CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
- if (commInfra == CommInfrastructure.UEB) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.DMAAP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
- }
- } else if (commInfra == CommInfrastructure.NOOP) {
- if (isSource) {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
- } else {
- propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
- }
- } else {
- throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
- }
-
// 1. first the topic
String firstTopic = topic.getTopic();
+ String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
+
// 2. check if there is a custom decoder for this topic that the user prefers to use
// instead of the ones provided in the platform
- String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
-
- CustomGsonCoder customGsonCoder = null;
- if (customGson != null && !customGson.isEmpty()) {
- try {
- customGsonCoder = new CustomGsonCoder(customGson);
- } catch (IllegalArgumentException e) {
- logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
- e.getMessage(), e);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
// 3. second the list of classes associated with each topic
String eventClasses = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
+ .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
if (eventClasses == null || eventClasses.isEmpty()) {
logger.warn("There are no event classes for topic {}", firstTopic);
continue;
}
- List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
+ List<PotentialCoderFilter> classes2Filters =
+ getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
- List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
+ TopicCoderFilterConfiguration topic2Classes2Filters =
+ new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
+ topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ }
- for (String theClass : topicClasses) {
+ return topics2DecodedClasses2Filters;
+ }
+ private String getPropertyTopicPrefix(Topic topic) {
+ boolean isSource = topic instanceof TopicSource;
+ CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
+ if (commInfra == CommInfrastructure.UEB) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.DMAAP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
+ }
+ } else if (commInfra == CommInfrastructure.NOOP) {
+ if (isSource) {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
+ } else {
+ return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
+ }
+ }
+
+ private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
+ String customGson = properties.getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
+
+ CustomGsonCoder customGsonCoder = null;
+ if (customGson != null && !customGson.isEmpty()) {
+ try {
+ customGsonCoder = new CustomGsonCoder(customGson);
+ } catch (IllegalArgumentException e) {
+ logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
+ e.getMessage(), e);
+ }
+ }
+ return customGsonCoder;
+ }
- // 4. third, for each coder class, get the filter expression
+ private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
+ String eventClasses) {
- String filter = properties
- .getProperty(propertyTopicEntityPrefix + firstTopic
- + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
- + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+ List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
- JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
- PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
- classes2Filters.add(class2Filters);
- }
+ List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
- TopicCoderFilterConfiguration topic2Classes2Filters =
- new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
- topics2DecodedClasses2Filters.add(topic2Classes2Filters);
+ for (String theClass : topicClasses) {
+
+ // 4. for each coder class, get the filter expression
+
+ String filter = properties
+ .getProperty(propertyPrefix
+ + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
+ + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
+
+ JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
+ PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
+ classes2Filters.add(class2Filters);
}
- return topics2DecodedClasses2Filters;
+ return classes2Filters;
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
index ca1f2283..77bfcf9f 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java
@@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.common.utils.services.OrderedServiceImpl;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
@@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController {
logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
- if (newGroupId == null || newGroupId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven group-id coordinate");
- }
+ validateText(newGroupId, "Missing maven group-id coordinate");
+ validateText(newArtifactId, "Missing maven artifact-id coordinate");
+ validateText(newVersion, "Missing maven version coordinate");
- if (newArtifactId == null || newArtifactId.isEmpty()) {
- throw new IllegalArgumentException("Missing maven artifact-id coordinate");
- }
-
- if (newVersion == null || newVersion.isEmpty()) {
- throw new IllegalArgumentException("Missing maven version coordinate");
- }
-
- if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
- || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
- || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
- throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion);
- }
+ validateHasBrain(newGroupId, newArtifactId, newVersion);
if (newGroupId.equalsIgnoreCase(this.getGroupId())
&& newArtifactId.equalsIgnoreCase(this.getArtifactId())
@@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController {
return;
}
- if (!newGroupId.equalsIgnoreCase(this.getGroupId())
- || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
- throw new IllegalArgumentException(
- "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
- + newGroupId + ":" + newArtifactId + ":"
- + newVersion + " vs. " + this);
- }
+ validateNewVersion(newGroupId, newArtifactId, newVersion);
/* upgrade */
String messages = this.policyContainer.updateToVersion(newVersion);
@@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController {
logger.info("UPDATE-TO-VERSION: completed {}", this);
}
+ private void validateText(String text, String errorMessage) {
+ if (text == null || text.isEmpty()) {
+ throw new IllegalArgumentException(errorMessage);
+ }
+ }
+
+ private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) {
+ if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID)
+ || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID)
+ || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) {
+ throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion);
+ }
+ }
+
+ private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) {
+ if (!newGroupId.equalsIgnoreCase(this.getGroupId())
+ || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
+ throw new IllegalArgumentException(
+ "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
+ + newGroupId + ":" + newArtifactId + ":"
+ + newVersion + " vs. " + this);
+ }
+ }
+
/**
* initialize decoders for all the topics supported by this controller
* Note this is critical to be done after the Policy Container is
@@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController {
for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
String topic = coderConfig.getTopic();
- CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
- if (customGsonCoder != null
- && customGsonCoder.getClassContainer() != null
- && !customGsonCoder.getClassContainer().isEmpty()) {
-
- String customGsonCoderClass = customGsonCoder.getClassContainer();
- if (!isClass(customGsonCoderClass)) {
- throw makeRetrieveEx(customGsonCoderClass);
- } else {
- logClassFetched(customGsonCoderClass);
- }
- }
+ CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig);
List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
if (coderFilters == null || coderFilters.isEmpty()) {
@@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController {
}
}
+ private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) {
+ CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
+ if (customGsonCoder != null
+ && customGsonCoder.getClassContainer() != null
+ && !customGsonCoder.getClassContainer().isEmpty()) {
+
+ String customGsonCoderClass = customGsonCoder.getClassContainer();
+ if (!isClass(customGsonCoderClass)) {
+ throw makeRetrieveEx(customGsonCoderClass);
+ } else {
+ logClassFetched(customGsonCoderClass);
+ }
+ }
+ return customGsonCoder;
+ }
+
/**
* Logs an error and makes an exception for an item that cannot be retrieved.
* @param itemName the item to retrieve
@@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController {
// Broadcast
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.beforeInsert(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.beforeInsert(this, event),
+ (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean successInject = this.policyContainer.insertAll(event);
@@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController {
logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
}
- for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
- try {
- if (feature.afterInsert(this, event, successInject)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-insert failure because of {}",
- this, feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getDroolsProviders().getList(),
+ feature -> feature.afterInsert(this, event, successInject),
+ (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return true;
@@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController {
PolicySession session = getSession(sessionName);
KieSession kieSession = session.getKieSession();
- boolean found = false;
- for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
- for (Query q : kiePackage.getQueries()) {
- if (q.getName() != null && q.getName().equals(queryName)) {
- found = true;
- break;
- }
- }
- }
- if (!found) {
- throw new IllegalArgumentException("Invalid Query Name: " + queryName);
- }
+ validateQueryName(kieSession, queryName);
List<Object> factObjects = new ArrayList<>();
@@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController {
return factObjects;
}
+ private void validateQueryName(KieSession kieSession, String queryName) {
+ for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
+ for (Query q : kiePackage.getQueries()) {
+ if (q.getName() != null && q.getName().equals(queryName)) {
+ return;
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Invalid Query Name: " + queryName);
+ }
+
@Override
public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
String factClassName = fact.getClass().getName();
diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
index 89a7a420..cb4ce07e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java
@@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder {
coders.put(key, coderTools);
- if (reverseCoders.containsKey(reverseKey)) {
- // There is another controller (different group id/artifact id/topic)
- // that shares the class and the topic.
-
- List<ProtocolCoderToolset> toolsets =
- reverseCoders.get(reverseKey);
- boolean present = false;
- for (ProtocolCoderToolset parserSet : toolsets) {
- // just doublecheck
- present = parserSet.getControllerId().equals(key);
- if (present) {
- /* anomaly */
- logger.error(
- "{}: unexpected toolset reverse mapping found for {}:{}: {}",
- this,
- reverseKey,
- key,
- parserSet);
- }
- }
+ addReverseCoder(coderTools, key, reverseKey);
+ }
+ }
+ private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) {
+ if (reverseCoders.containsKey(reverseKey)) {
+ // There is another controller (different group id/artifact id/topic)
+ // that shares the class and the topic.
+
+ List<ProtocolCoderToolset> toolsets =
+ reverseCoders.get(reverseKey);
+ boolean present = false;
+ for (ProtocolCoderToolset parserSet : toolsets) {
+ // just doublecheck
+ present = parserSet.getControllerId().equals(key);
if (present) {
- return;
- } else {
- logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
- toolsets.add(coderTools);
+ /* anomaly */
+ logger.error(
+ "{}: unexpected toolset reverse mapping found for {}:{}: {}",
+ this,
+ reverseKey,
+ key,
+ parserSet);
}
+ }
+
+ if (present) {
+ return;
} else {
- List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools);
toolsets.add(coderTools);
-
- logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
- reverseCoders.put(reverseKey, toolsets);
}
+ } else {
+ List<ProtocolCoderToolset> toolsets = new ArrayList<>();
+ toolsets.add(coderTools);
+
+ logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets);
+ reverseCoders.put(reverseKey, toolsets);
}
}
@@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder {
for (CoderFilters codeFilter : coderToolset.getCoders()) {
String className = codeFilter.getCodedClass();
String reverseKey = this.reverseCodersKey(topic, className);
- if (this.reverseCoders.containsKey(reverseKey)) {
- List<ProtocolCoderToolset> toolsets =
- this.reverseCoders.get(reverseKey);
- Iterator<ProtocolCoderToolset> toolsetsIter =
- toolsets.iterator();
- while (toolsetsIter.hasNext()) {
- ProtocolCoderToolset toolset = toolsetsIter.next();
- if (toolset.getControllerId().equals(key)) {
- logger.info(
- "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
- toolsetsIter.remove();
- }
- }
-
- if (this.reverseCoders.get(reverseKey).isEmpty()) {
- logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
- this.reverseCoders.remove(reverseKey);
- }
- }
+ removeReverseCoder(key, reverseKey);
}
}
}
}
+ private void removeReverseCoder(String key, String reverseKey) {
+ if (!this.reverseCoders.containsKey(reverseKey)) {
+ return;
+ }
+
+ List<ProtocolCoderToolset> toolsets =
+ this.reverseCoders.get(reverseKey);
+ Iterator<ProtocolCoderToolset> toolsetsIter =
+ toolsets.iterator();
+ while (toolsetsIter.hasNext()) {
+ ProtocolCoderToolset toolset = toolsetsIter.next();
+ if (toolset.getControllerId().equals(key)) {
+ logger.info(
+ "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey);
+ toolsetsIter.remove();
+ }
+ }
+
+ if (this.reverseCoders.get(reverseKey).isEmpty()) {
+ logger.info("{}: removing reverse mapping for {}: ", this, reverseKey);
+ this.reverseCoders.remove(reverseKey);
+ }
+ }
+
/**
* does it support coding.
*
@@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder {
}
for (ProtocolCoderToolset encoderSet : toolsets) {
- // figure out the right toolset
- String groupId = encoderSet.getGroupId();
- String artifactId = encoderSet.getArtifactId();
- List<CoderFilters> coderFilters = encoderSet.getCoders();
- for (CoderFilters coder : coderFilters) {
- if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
- DroolsController droolsController =
- DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
- if (droolsController.ownsCoder(
- encodedClass.getClass(), coder.getModelClassLoaderHash())) {
- droolsControllers.add(droolsController);
- }
- }
- }
+ addToolsetControllers(droolsControllers, encodedClass, encoderSet);
}
if (droolsControllers.isEmpty()) {
@@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder {
return droolsControllers;
}
+ private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass,
+ ProtocolCoderToolset encoderSet) {
+ // figure out the right toolset
+ String groupId = encoderSet.getGroupId();
+ String artifactId = encoderSet.getArtifactId();
+ List<CoderFilters> coderFilters = encoderSet.getCoders();
+ for (CoderFilters coder : coderFilters) {
+ if (coder.getCodedClass().equals(encodedClass.getClass().getName())) {
+ DroolsController droolsController =
+ DroolsControllerConstants.getFactory().get(groupId, artifactId, "");
+ if (droolsController.ownsCoder(
+ encodedClass.getClass(), coder.getModelClassLoaderHash())) {
+ droolsControllers.add(droolsController);
+ }
+ }
+ }
+ }
+
/**
* get all filters by maven coordinates and topic.
*
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
index 17247f41..82cd015e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java
@@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable {
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSource> getTopicSources();
+ List<TopicSource> getTopicSources();
/**
* Get the topic readers of interest for this controller.
*/
- List<? extends TopicSink> getTopicSinks();
+ List<TopicSink> getTopicSinks();
/**
* Get the Drools Controller.
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
index 36d8ca59..32e3f674 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java
@@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
+import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
@@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
@@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
import org.onap.policy.common.gson.annotation.GsonJsonProperty;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.core.PolicyContainer;
@@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Is the Policy Engine running.
*/
+ @Getter
private volatile boolean alive = false;
/**
* Is the engine locked.
*/
+ @Getter
private volatile boolean locked = false;
/**
@@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine {
/**
* Policy Engine Sources.
*/
- private List<? extends TopicSource> sources = new ArrayList<>();
+ @Getter
+ private List<TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks.
*/
- private List<? extends TopicSink> sinks = new ArrayList<>();
+ @Getter
+ private List<TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers.
*/
+ @Getter
private List<HttpServletServer> httpServers = new ArrayList<>();
/**
@@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public synchronized void boot(String[] cliArgs) {
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeBoot(this, cliArgs)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeBoot(this, cliArgs),
+ (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
try {
@@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterBoot(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterBoot(this),
+ (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch pre configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeConfigure(this, properties)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-configure failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeConfigure(this, properties),
+ (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.properties = properties;
@@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post configure hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterConfigure(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterConfigure(this),
+ (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine {
}
// feature hook
- for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
- try {
- if (controllerFeature.afterCreate(controller)) {
- return controller;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-controller-create failure because of {}", this,
- controllerFeature.getClass().getName(), e.getMessage(), e);
- }
- }
+ PolicyController controller2 = controller;
+ FeatureApiUtils.apply(getControllerProviders(),
+ feature -> feature.afterCreate(controller2),
+ (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
+ this, feature.getClass().getName(), ex.getMessage(), ex));
return controller;
}
@@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("No controller configuration has been provided");
}
- PolicyController policyController = null;
try {
final String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
@@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException("operation must be provided");
}
- try {
- policyController = getControllerFactory().get(controllerName);
- } catch (final IllegalArgumentException e) {
- // not found
- logger.warn("Policy Controller " + controllerName + " not found", e);
- }
-
+ PolicyController policyController = getController(controllerName);
if (policyController == null) {
-
- if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
- || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
- throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
- }
-
- /* Recovery case */
-
- logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
-
- final Properties controllerProperties =
- getPersistenceManager().getControllerProperties(controllerName);
-
- /*
- * returned properties cannot be null (per implementation) assert (properties !=
- * null)
- */
-
- if (controllerProperties == null) {
- throw new IllegalArgumentException(controllerName + " is invalid");
- }
-
- logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
- controllerName);
-
- /*
- * try to bring up bad controller in brainless mode, after having it
- * working, apply the new create/update operation.
- */
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
- DroolsControllerConstants.NO_GROUP_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
- DroolsControllerConstants.NO_ARTIFACT_ID);
- controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
- DroolsControllerConstants.NO_VERSION);
-
- policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ policyController = findController(controllerName, operation);
/* fall through to do brain update operation */
}
- switch (operation) {
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
- policyController.unlock();
- getControllerFactory().patch(policyController, configController.getDrools());
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
- policyController.lock();
- break;
- case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
- policyController.unlock();
- break;
- default:
- final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
- + controllerName;
- logger.warn(msg);
- throw new IllegalArgumentException(msg);
- }
+ updateController(controllerName, policyController, operation, configController);
return policyController;
} catch (final Exception e) {
@@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine {
}
}
+ private PolicyController getController(final String controllerName) {
+ PolicyController policyController = null;
+ try {
+ policyController = getControllerFactory().get(controllerName);
+ } catch (final IllegalArgumentException e) {
+ // not found
+ logger.warn("Policy Controller " + controllerName + " not found", e);
+ }
+ return policyController;
+ }
+
+ private PolicyController findController(final String controllerName, final String operation) {
+ if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
+ || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
+ throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
+ }
+
+ /* Recovery case */
+
+ logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
+
+ final Properties controllerProperties =
+ getPersistenceManager().getControllerProperties(controllerName);
+
+ /*
+ * returned properties cannot be null (per implementation) assert (properties !=
+ * null)
+ */
+
+ if (controllerProperties == null) {
+ throw new IllegalArgumentException(controllerName + " is invalid");
+ }
+
+ logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
+ controllerName);
+
+ /*
+ * try to bring up bad controller in brainless mode, after having it
+ * working, apply the new create/update operation.
+ */
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
+ DroolsControllerConstants.NO_GROUP_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
+ DroolsControllerConstants.NO_ARTIFACT_ID);
+ controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
+ DroolsControllerConstants.NO_VERSION);
+
+ return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
+ }
+
+ private void updateController(final String controllerName, PolicyController policyController,
+ final String operation, ControllerConfiguration configController) {
+ switch (operation) {
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
+ policyController.unlock();
+ getControllerFactory().patch(policyController, configController.getDrools());
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
+ policyController.lock();
+ break;
+ case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
+ policyController.unlock();
+ break;
+ default:
+ final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
+ + controllerName;
+ logger.warn(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
@Override
public synchronized boolean start() {
/* policy-engine dispatch pre start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
- boolean success = true;
if (this.locked) {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
this.alive = true;
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.waitedStart(10 * 1000L)) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ httpServer -> httpServer.waitedStart(10 * 1000L),
+ (item, ex) -> logger.error("{}: cannot start http-server {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine exclusively-owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::start,
+ (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::start,
+ (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
+ this, item, ex.getMessage(), ex));
/* Start Policy Controllers */
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.start()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(),
- e);
- success = false;
- }
- }
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::start,
+ (item, ex) -> {
+ logger.error("{}: cannot start policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Start managed Topic Endpoints */
try {
if (!getTopicEndpointManager().start()) {
- success = false;
+ success.set(false);
}
} catch (final IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
@@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine {
startPdpJmxListener();
/* policy-engine dispatch after start hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStart(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
+ }
+
+ @FunctionalInterface
+ private static interface PredicateWithEx<T> {
+ public boolean test(T value) throws InterruptedException;
}
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless of the lock state */
- boolean success = true;
if (!this.alive) {
return true;
}
this.alive = false;
- final List<PolicyController> controllers = getControllerFactory().inventory();
- for (final PolicyController controller : controllers) {
- try {
- if (!controller.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e);
- success = false;
- }
- }
+ AtomicReference<Boolean> success = new AtomicReference<>(true);
+
+ attempt(success, getControllerFactory().inventory(),
+ PolicyController::stop,
+ (item, ex) -> {
+ logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
+ ex.getMessage(), ex);
+ success.set(false);
+ });
/* Stop Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- if (!source.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ attempt(success, this.sources,
+ TopicSource::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Stop Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- if (!sink.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ attempt(success, this.sinks,
+ TopicSink::stop,
+ (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* stop all managed topics sources and sinks */
if (!getTopicEndpointManager().stop()) {
- success = false;
+ success.set(false);
}
/* stop all unmanaged http servers */
- for (final HttpServletServer httpServer : this.httpServers) {
- try {
- if (!httpServer.stop()) {
- success = false;
- }
- } catch (final Exception e) {
- logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e);
- }
- }
+ attempt(success, this.httpServers,
+ HttpServletServer::stop,
+ (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
// stop JMX?
/* policy-engine dispatch pre stop hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterStop(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
- return success;
+ return success.get();
}
@Override
@@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine {
exitThread.start();
/* policy-engine dispatch pre shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
- for (final TopicSource source : this.sources) {
- try {
- source.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e);
- }
- }
+ applyAll(this.sources,
+ TopicSource::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown Policy Engine owned (unmanaged) sinks */
- for (final TopicSink sink : this.sinks) {
- try {
- sink.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e);
- }
- }
+ applyAll(this.sinks,
+ TopicSink::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
+ ex.getMessage(), ex));
/* Shutdown managed resources */
getControllerFactory().shutdown();
@@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine {
stopPdpJmxListener();
/* policy-engine dispatch post shutdown hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
+
+ exitThread.interrupt();
+ logger.info("{}: normal termination", this);
+ }
+
+ private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
try {
- if (feature.afterShutdown(this)) {
- return;
+ if (!pred.test(item)) {
+ success.set(false);
}
- } catch (final Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
+
+ } catch (InterruptedException ex) {
+ handleEx.accept(item, ex);
+ Thread.currentThread().interrupt();
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
}
}
+ }
- exitThread.interrupt();
- logger.info("{}: normal termination", this);
+ private <T> void applyAll(List<T> items, Consumer<T> function,
+ BiConsumer<T,Exception> handleEx) {
+
+ for (T item : items) {
+ try {
+ function.accept(item);
+
+ } catch (RuntimeException ex) {
+ handleEx.accept(item, ex);
+ }
+ }
}
/**
@@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine {
/*
* shut down the Policy Engine owned http servers as the very last thing
*/
- for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) {
- try {
- httpServer.shutdown();
- } catch (final Exception e) {
- logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this,
- httpServer, e.getMessage(), e);
- }
- }
+ applyAll(PolicyEngineManager.this.getHttpServers(),
+ HttpServletServer::shutdown,
+ (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
+ ex.getMessage(), ex));
logger.info("{}: exit", PolicyEngineManager.this);
doExit(0);
@@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine {
}
@Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.locked) {
@@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().lock() && success;
/* policy-engine dispatch post lock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterLock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (!this.locked) {
@@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine {
success = getTopicEndpointManager().unlock() && success;
/* policy-engine dispatch after unlock hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return success;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
public void removePolicyController(String name) {
getControllerFactory().destroy(name);
}
@@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine {
return this.properties;
}
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSource> getSources() {
- return (List<TopicSource>) this.sources;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public List<TopicSink> getSinks() {
- return (List<TopicSink>) this.sinks;
- }
-
- @Override
- public List<HttpServletServer> getHttpServers() {
- return this.httpServers;
- }
-
@Override
public List<String> getFeatures() {
final List<String> features = new ArrayList<>();
@@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine {
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* policy-engine pre topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.beforeOnTopicEvent(this, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
+ (feature, ex) -> logger.error(
+ "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex))) {
+ return;
}
/* configuration request */
@@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine after topic event hook */
- for (final PolicyEngineFeatureApi feature : getFeatureProviders()) {
- try {
- if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
- feature.getClass().getName(), event, e.getMessage(), e);
- }
- }
+ PdpdConfiguration configuration2 = configuration;
+ FeatureApiUtils.apply(getFeatureProviders(),
+ feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
+ feature.getClass().getName(), event, ex.getMessage(), ex));
}
@Override
@@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
+ final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
if (topicSinks == null || topicSinks.size() != 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine {
* Note this entry point is usually from the DRL (one of the reasons busType is String.
*/
- if (busType == null || busType.isEmpty()) {
+ if (StringUtils.isBlank(busType)) {
throw new IllegalArgumentException("Invalid Communication Infrastructure");
}
- if (topic == null || topic.isEmpty()) {
+ if (StringUtils.isBlank(topic)) {
throw new IllegalArgumentException(INVALID_TOPIC_MSG);
}
@@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalArgumentException(INVALID_EVENT_MSG);
}
- boolean valid = false;
- for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) {
- if (comm.name().equals(busType)) {
- valid = true;
- }
- }
+ boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
+ .anyMatch(name -> name.equals(busType));
if (!valid) {
throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
@@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine {
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
// activate 'policy-management'
@@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine {
this.unlock();
/* policy-engine dispatch post activate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterActivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterActivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.beforeDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} before-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.beforeDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.lock();
@@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine {
}
/* policy-engine dispatch post deactivate hook */
- for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
- try {
- if (feature.afterDeactivate(this)) {
- return;
- }
- } catch (final Exception e) {
- logger.error("{}: feature {} after-deactivate failure because of {}", this,
- feature.getClass().getName(), e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getEngineProviders(),
+ feature -> feature.afterDeactivate(this),
+ (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
private boolean controllerConfig(PdpdConfiguration config) {
@@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine {
}
final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
- boolean success = false;
- if (!(policyControllers == null || policyControllers.isEmpty())
- && (policyControllers.size() == configControllers.size())) {
- success = true;
- }
- return success;
+ return (policyControllers != null && !policyControllers.isEmpty()
+ && policyControllers.size() == configControllers.size());
}
@Override
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
index 5d915104..aa57abaf 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
@@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
+import org.onap.policy.common.utils.services.FeatureApiUtils;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.DroolsControllerConstants;
import org.onap.policy.drools.controller.DroolsControllerFactory;
@@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory;
*/
public class AggregatedPolicyController implements PolicyController, TopicListener {
+ private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
+ private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
+
/**
* Logger.
*/
@@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
/**
* Abstracted Event Sources List regardless communication technology.
*/
- private final List<? extends TopicSource> sources;
+ private final List<TopicSource> sources;
/**
* Abstracted Event Sinks List regardless communication technology.
*/
- private final List<? extends TopicSink> sinks;
+ private final List<TopicSink> sinks;
/**
* Mapping topics to sinks.
@@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean start() {
logger.info("{}: start", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStart(this),
+ (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (this.isLocked()) {
@@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
}
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStart(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStart(this),
+ (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean stop() {
logger.info("{}: stop", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeStop(this),
+ (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
/* stop regardless locked state */
@@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.stop();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterStop(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterStop(this),
+ (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void shutdown() {
logger.info("{}: shutdown", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().shutdown(this.droolsController);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterShutdown(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterShutdown(this),
+ (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public void halt() {
logger.info("{}: halt", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
this.stop();
getDroolsFactory().destroy(this.droolsController);
getPersistenceManager().deleteController(this.name);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterHalt(this)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterHalt(this),
+ (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
/**
@@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, commType, topic, event)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, commType, topic, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return;
}
boolean success = this.droolsController.offer(topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, commType, topic, event, success)) {
- return;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, commType, topic, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
}
@Override
@@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
return true;
}
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeOffer(this, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeOffer(this, event),
+ (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
boolean success = this.droolsController.offer(event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterOffer(this, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterOffer(this, event, success),
+ (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeDeliver(this, commType, topic, event)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeDeliver(this, commType, topic, event),
+ (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
if (topic == null || topic.isEmpty()) {
@@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterDeliver(this, commType, topic, event, success)) {
- return success;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterDeliver(this, commType, topic, event, success),
+ (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
public boolean lock() {
logger.info("{}: lock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeLock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.lock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterLock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterLock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
logger.info("{}: unlock", this);
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.beforeUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
+ if (FeatureApiUtils.apply(getProviders(),
+ feature -> feature.beforeUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex))) {
+ return true;
}
synchronized (this) {
@@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
boolean success = this.droolsController.unlock();
- for (PolicyControllerFeatureApi feature : getProviders()) {
- try {
- if (feature.afterUnlock(this)) {
- return true;
- }
- } catch (Exception e) {
- logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
- e.getMessage(), e);
- }
- }
+ FeatureApiUtils.apply(getProviders(),
+ feature -> feature.afterUnlock(this),
+ (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
+ feature.getClass().getName(), ex.getMessage(), ex));
return success;
}
@@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSource> getTopicSources() {
+ public List<TopicSource> getTopicSources() {
return this.sources;
}
@@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
* {@inheritDoc}.
*/
@Override
- public List<? extends TopicSink> getTopicSinks() {
+ public List<TopicSink> getTopicSinks() {
return this.sinks;
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
index 4c262775..0d8bdfab 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java
@@ -22,7 +22,10 @@ package org.onap.policy.drools.controller.internal;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.kie.api.builder.ReleaseId;
@@ -42,13 +45,15 @@ public class MavenDroolsControllerTest {
private static volatile ReleaseId releaseId;
+ private static volatile CountDownLatch running;
+
/**
* Set up.
*
* @throws IOException throws an IO exception
*/
@BeforeClass
- public static void setUp() throws IOException {
+ public static void setUpBeforeClass() throws IOException {
releaseId =
KieUtils.installArtifact(Paths.get(JUNIT_ECHO_KMODULE_PATH).toFile(),
Paths.get(JUNIT_ECHO_KMODULE_POM_PATH).toFile(),
@@ -56,6 +61,15 @@ public class MavenDroolsControllerTest {
Paths.get(JUNIT_ECHO_KMODULE_DRL_PATH).toFile());
}
+ @Before
+ public void setUp() {
+ running = new CountDownLatch(1);
+ }
+
+ public static void setRunning() {
+ running.countDown();
+ }
+
@Test
public void stop() throws InterruptedException {
createDroolsController(10000L).stop();
@@ -106,8 +120,8 @@ public class MavenDroolsControllerTest {
Assert.assertEquals(releaseId.getArtifactId(), controller.getContainer().getArtifactId());
Assert.assertEquals(releaseId.getVersion(), controller.getContainer().getVersion());
- /* courtesy timer to allow full initialization from local maven repository */
- Thread.sleep(courtesyStartTimeMs);
+ /* allow full initialization from local maven repository */
+ Assert.assertTrue(running.await(courtesyStartTimeMs, TimeUnit.MILLISECONDS));
Assert.assertEquals(1, controller.getSessionNames().size());
Assert.assertEquals(JUNIT_ECHO_KSESSION, controller.getSessionNames().get(0));
diff --git a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
index bd595725..7787a7b6 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
@@ -231,7 +231,7 @@ public class ProtocolCoderToolsetTest {
Properties sinkConfig = new Properties();
sinkConfig.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC);
- final List<? extends TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig);
+ final List<TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig);
Properties droolsControllerConfig = new Properties();
droolsControllerConfig.put(DroolsPropertyConstants.RULES_GROUPID, releaseId.getGroupId());
diff --git a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
index 173c1738..237bd4df 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
@@ -171,13 +171,17 @@ public class RestManagerTest {
* @throws InterruptedException Interrupted exception
*/
@AfterClass
- public static void tearDown() throws IOException, InterruptedException {
+ public static void tearDown() throws IOException {
+ try {
+ client.close();
+ } catch (IOException ex) {
+ logger.warn("cannot close HTTP client connection", ex);
+ }
+
/* Shutdown managed resources */
PolicyControllerConstants.getFactory().shutdown();
TopicEndpointManager.getManager().shutdown();
PolicyEngineConstants.getManager().stop();
- Thread.sleep(10000L);
- client.close();
cleanUpWorkingDirs();
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
index df1f6cca..997fc03e 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java
@@ -20,6 +20,7 @@
package org.onap.policy.drools.system;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -29,6 +30,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
@@ -302,7 +304,6 @@ public class PolicyEngineTest {
TopicEndpointManager.getManager().shutdown();
PolicyEngineConstants.getManager().stop();
- Thread.sleep(10000L);
- assertFalse(PolicyEngineConstants.getManager().isAlive());
+ await().atMost(10, TimeUnit.SECONDS).until(() -> !PolicyEngineConstants.getManager().isAlive());
}
}
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
index 6f09ab9b..695893d4 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java
@@ -924,7 +924,7 @@ public class AggregatedPolicyControllerTest {
// remaining methods should not have been invoked
assertThatThrownBy(() -> verifyBefore.accept(prov2)).isInstanceOf(AssertionError.class);
- assertThatThrownBy(() -> verifyMiddle.run()).isInstanceOf(AssertionError.class);
+ assertThatThrownBy(verifyMiddle::run).isInstanceOf(AssertionError.class);
assertThatThrownBy(() -> verifyAfter.accept(prov1)).isInstanceOf(AssertionError.class);
assertThatThrownBy(() -> verifyAfter.accept(prov2)).isInstanceOf(AssertionError.class);
diff --git a/policy-management/src/test/resources/echo.drl b/policy-management/src/test/resources/echo.drl
index 664df639..bd26f95b 100644
--- a/policy-management/src/test/resources/echo.drl
+++ b/policy-management/src/test/resources/echo.drl
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,10 +20,13 @@
package org.onap.policy.drools.test;
+import org.onap.policy.drools.controller.internal.MavenDroolsControllerTest;
+
rule "INIT"
lock-on-active
when
then
+ MavenDroolsControllerTest.setRunning();
insert(new String("hello,I am up"));
end
diff --git a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
index f10343e2..ac90ab96 100644
--- a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
+++ b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java
@@ -55,6 +55,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -221,111 +222,45 @@ class MdcTransactionImpl implements MdcTransaction {
*/
@Override
public MdcTransaction flush() {
- if (this.requestId != null && !this.requestId.isEmpty()) {
- MDC.put(REQUEST_ID, this.requestId);
- }
-
- if (this.invocationId != null && !this.invocationId.isEmpty()) {
- MDC.put(INVOCATION_ID, this.invocationId);
- }
-
- if (this.partner != null) {
- MDC.put(PARTNER_NAME, this.partner);
- }
-
- if (this.virtualServerName != null) {
- MDC.put(VIRTUAL_SERVER_NAME, this.virtualServerName);
- }
-
- if (this.serverName != null) {
- MDC.put(SERVER, this.serverName);
- }
-
- if (this.serverIpAddress != null) {
- MDC.put(SERVER_IP_ADDRESS, this.serverIpAddress);
- }
-
- if (this.serverFqdn != null) {
- MDC.put(SERVER_FQDN, this.serverFqdn);
- }
-
- if (this.serviceName != null) {
- MDC.put(SERVICE_NAME, this.serviceName);
- }
-
- if (this.startTime != null) {
- MDC.put(BEGIN_TIMESTAMP, timestamp(this.startTime));
- }
-
- if (this.endTime != null) {
- MDC.put(END_TIMESTAMP, timestamp(this.endTime));
- } else {
- this.setEndTime(null);
- MDC.put(END_TIMESTAMP, timestamp(this.endTime));
- }
+ setMdc(REQUEST_ID, this.requestId);
+ setMdc(INVOCATION_ID, this.invocationId);
+ setMdc(PARTNER_NAME, this.partner);
+ setMdc(VIRTUAL_SERVER_NAME, this.virtualServerName);
+ setMdc(SERVER, this.serverName);
+ setMdc(SERVER_IP_ADDRESS, this.serverIpAddress);
+ setMdc(SERVER_FQDN, this.serverFqdn);
+ setMdc(SERVICE_NAME, this.serviceName);
+ setMdc(BEGIN_TIMESTAMP, timestamp(this.startTime));
+ setMdc(END_TIMESTAMP, timestamp(this.endTime));
if (this.elapsedTime != null) {
MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
- } else {
- if (endTime != null && startTime != null) {
- this.elapsedTime = Duration.between(startTime, endTime).toMillis();
- MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
- }
- }
-
- if (this.serviceInstanceId != null) {
- MDC.put(SERVICE_INSTANCE_ID, this.serviceInstanceId);
- }
-
- if (this.instanceUuid != null) {
- MDC.put(INSTANCE_UUID, this.instanceUuid);
- }
-
- if (this.processKey != null) {
- MDC.put(PROCESS_KEY, this.processKey);
- }
-
- if (this.statusCode != null) {
- MDC.put(STATUS_CODE, this.statusCode);
- }
-
- if (this.responseCode != null) {
- MDC.put(RESPONSE_CODE, this.responseCode);
- }
-
- if (this.responseDescription != null) {
- MDC.put(RESPONSE_DESCRIPTION, this.responseDescription);
- }
-
- if (this.theSeverity != null) {
- MDC.put(SEVERITY, this.theSeverity);
- }
-
- if (this.alertSeverity != null) {
- MDC.put(ALERT_SEVERITY, this.alertSeverity);
- }
-
- if (this.targetEntity != null) {
- MDC.put(TARGET_ENTITY, this.targetEntity);
- }
-
- if (this.targetServiceName != null) {
- MDC.put(TARGET_SERVICE_NAME, this.targetServiceName);
+ } else if (endTime != null && startTime != null) {
+ this.elapsedTime = Duration.between(startTime, endTime).toMillis();
+ MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime));
}
- if (this.targetVirtualEntity != null) {
- MDC.put(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity);
- }
+ setMdc(SERVICE_INSTANCE_ID, this.serviceInstanceId);
+ setMdc(INSTANCE_UUID, this.instanceUuid);
+ setMdc(PROCESS_KEY, this.processKey);
+ setMdc(STATUS_CODE, this.statusCode);
+ setMdc(RESPONSE_CODE, this.responseCode);
+ setMdc(RESPONSE_DESCRIPTION, this.responseDescription);
+ setMdc(SEVERITY, this.theSeverity);
+ setMdc(ALERT_SEVERITY, this.alertSeverity);
+ setMdc(TARGET_ENTITY, this.targetEntity);
+ setMdc(TARGET_SERVICE_NAME, this.targetServiceName);
+ setMdc(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity);
+ setMdc(CLIENT_IP_ADDRESS, this.clientIpAddress);
+ setMdc(REMOTE_HOST, this.remoteHost);
- if (this.clientIpAddress != null) {
- MDC.put(CLIENT_IP_ADDRESS, this.clientIpAddress);
- }
+ return this;
+ }
- if (this.remoteHost != null) {
- MDC.put(REMOTE_HOST, this.remoteHost);
+ private void setMdc(String paramName, String value) {
+ if (!StringUtils.isBlank(value)) {
+ MDC.put(paramName, value);
}
-
- return this;
}
@Override
@@ -689,6 +624,10 @@ class MdcTransactionImpl implements MdcTransaction {
@Override
public String timestamp(Instant time) {
+ if (time == null) {
+ return null;
+ }
+
return new SimpleDateFormat(DATE_FORMAT).format(Date.from(time));
}