diff options
21 files changed, 1698 insertions, 1739 deletions
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java index 33c8a8d8..85cf88b3 100644 --- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java +++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/DroolsPdpsElectionHandler.java @@ -202,255 +202,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { logger.debug("DesignatedWaiter.run: pdps.size= {}", pdps.size()); //This is only true if all designated PDPs have failed - boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps); - logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed); - for (DroolsPdp pdp : pdps) { - logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId()); - - /* - * Note: side effect of isPdpCurrent is that any stale but - * designated PDPs will be marked as un-designated. - */ - boolean isCurrent = pdpsConnector.isPdpCurrent(pdp); - - /* - * We can't use stateManagement.getStandbyStatus() here, because - * we need the standbyStatus, not for this PDP, but for the PDP - * being processed by this loop iteration. - */ - String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId()); - if (standbyStatus == null) { - // Treat this case as a cold standby -- if we - // abort here, no sessions will be created in a - // single-node test environment. - standbyStatus = StateManagement.COLD_STANDBY; - } - logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent); - - /* - * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn - * and evaluate the each pdp in the list of pdps against each combination. - * - * This is the first combination of isDesignated and isCurrent - */ - if (pdp.isDesignated() && isCurrent) { - //It is current, but it could have a standbystatus=coldstandby / hotstandby - //If so, we need to stand it down and demote it - if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { - if (pdp.getPdpId().equals(myPdp.getPdpId())) { - logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, " - + "butstandbystatus is not providingservice. " - + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId()); - // So, we must demote it - try { - //Keep the order like this. StateManagement is last since it - //triggers controller shutdown - //This will change isDesignated and it can enter another if(combination) below - pdpsConnector.standDownPdp(pdp.getPdpId()); - myPdp.setDesignated(false); - isDesignated = false; - if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) - || standbyStatus.equals(StateManagement.COLD_STANDBY))) { - /* - * Only demote it if it appears it has not already been demoted. Don't worry - * about synching with the topic endpoint states. That is done by the - * refreshStateAudit - */ - stateManagementFeature.demote(); - } - //update the standbystatus to check in a later - //combination of isDesignated and isCurrent - standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId()); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: myPdp: {} " - + "Caught Exception attempting to demote myPdp," - + "message= {}", myPdp.getPdpId(), e); - } - } else { - // Don't demote a remote PDP that is current. It should catch itself - logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, " - + "but standbystatus is not providingservice. " - + " Cannot execute stateManagement.demote() " - + "since it it is not myPdp\n\n", - myPdp.getPdpId()); - } - - } else { - // If we get here, it is ok to be on the list - logger.debug("DesignatedWaiter.run: PDP= {} is designated, " - + "current and {} Noting PDP as " - + "designated, standbyStatus= {}", - pdp.getPdpId(), standbyStatus, standbyStatus); - listOfDesignated.add(pdp); - } - - - } - - - /* - * The second combination of isDesignated and isCurrent - * - * PDP is designated but not current; it has failed. - * So we stand it down (it doesn't matter what - * its standbyStatus is). None of these go on the list. - */ - if (pdp.isDesignated() && !isCurrent) { - logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently " - + "designated but is not current; " - + "it has failed. Standing down. standbyStatus= {}", - pdp.getPdpId(), standbyStatus); - /* - * Changes designated to 0 but it is still potentially providing service - * Will affect isDesignated, so, it can enter an if(combination) below - */ - pdpsConnector.standDownPdp(pdp.getPdpId()); - - //need to change standbystatus to coldstandby - if (pdp.getPdpId().equals(myPdp.getPdpId())) { - logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. " - + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId()); - // We found that myPdp is designated but not current - // So, we must cause it to disableFail - try { - myPdp.setDesignated(false); - pdpsConnector.setDesignated(myPdp, false); - isDesignated = false; - stateManagementFeature.disableFailed(); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception " - + "attempting to disableFail myPdp {}, message= {}", - myPdp.getPdpId(), myPdp.getPdpId(), e); - } - } else { //it is a remote PDP that is failed - logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. " - + " Executing stateManagement.disableFailed(otherResourceName)\n\n", - pdp.getPdpId() ); - // We found a PDP is designated but not current - // We already called standdown(pdp) which will change designated to false - // Now we need to disableFail it to get its states in synch. The standbyStatus - // should equal coldstandby - try { - stateManagementFeature.disableFailed(pdp.getPdpId()); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to " - + "disableFail({}), message= {}", - pdp.getPdpId(), pdp.getPdpId(), e); - } - - } - continue; //we are not going to do anything else with this pdp - } - - /* - * The third combination of isDesignated and isCurrent - * /* - * If a PDP is not currently designated but is providing service - * (erroneous, but recoverable) or hot standby - * we can add it to the list of possible designated if all the designated have failed - */ - if (!pdp.isDesignated() && isCurrent) { - if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) - || standbyStatus.equals(StateManagement.COLD_STANDBY))) { - logger.debug("\n\nDesignatedWaiter.run: PDP {}" - + " is NOT designated but IS current and" - + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus); - // Since it is current, we assume it can adjust its own state. - // We will demote if it is myPdp - if (pdp.getPdpId().equals(myPdp.getPdpId())) { - //demote it - logger.debug("DesignatedWaiter.run: PDP {} going to " - + "setDesignated = false and calling stateManagement.demote", - pdp.getPdpId()); - try { - //Keep the order like this. - //StateManagement is last since it triggers controller shutdown - pdpsConnector.setDesignated(myPdp, false); - myPdp.setDesignated(false); - isDesignated = false; - //This is definitely not a redundant call. - //It is attempting to correct a problem - stateManagementFeature.demote(); - //recheck the standbystatus - standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId()); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception " - + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(), - myPdp.getPdpId(), e); - } - - } - } - if (standbyStatus.equals(StateManagement.HOT_STANDBY) && designatedPdpHasFailed) { - //add it to the list - logger.debug("INFO: DesignatedWaiter.run: PDP= {}" - + " is not designated but is {} and designated PDP " - + "has failed. standbyStatus= {}", pdp.getPdpId(), - standbyStatus, standbyStatus); - listOfDesignated.add(pdp); - } - continue; //done with this one - } - - /* - * The fourth combination of isDesignated and isCurrent - * - * We are not going to put any of these on the list since it appears they have failed. - - * - */ - if (!pdp.isDesignated() && !isCurrent) { - logger.debug("INFO: DesignatedWaiter.run: PDP= {} " - + "designated= {}, current= {}, " - + "designatedPdpHasFailed= {}, " - + "standbyStatus= {}",pdp.getPdpId(), - pdp.isDesignated(), isCurrent, designatedPdpHasFailed, standbyStatus); - if (!standbyStatus.equals(StateManagement.COLD_STANDBY)) { - //stand it down - //disableFail it - pdpsConnector.standDownPdp(pdp.getPdpId()); - if (pdp.getPdpId().equals(myPdp.getPdpId())) { - /* - * I don't actually know how this condition could - * happen, but if it did, we would want to declare it - * failed. - */ - logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, " - + " Executing stateManagement.disableFailed()\n\n", - myPdp.getPdpId()); - // So, we must disableFail it - try { - //Keep the order like this. - //StateManagement is last since it triggers controller shutdown - pdpsConnector.setDesignated(myPdp, false); - myPdp.setDesignated(false); - isDesignated = false; - stateManagementFeature.disableFailed(); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to " - + "disableFail myPdp {}, message= {}", - myPdp.getPdpId(), myPdp.getPdpId(), e); - } - } else { //it is remote - logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, " - + " Executing stateManagement.disableFailed({})\n\n", - myPdp.getPdpId(), pdp.getPdpId()); - // We already called standdown(pdp) which will change designated to false - // Now we need to disableFail it to get its states in sync. - // StandbyStatus = coldstandby - try { - stateManagementFeature.disableFailed(pdp.getPdpId()); - } catch (Exception e) { - logger.error("DesignatedWaiter.run: for PDP {}" - + " Caught Exception attempting to disableFail({})" - + ", message=", pdp.getPdpId(), pdp.getPdpId(), e); - } - } - } - } - - - } // end pdps loop + allPdpsFailed(pdps, listOfDesignated); /* * We have checked the four combinations of isDesignated and isCurrent. Where appropriate, @@ -479,7 +231,7 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { /* - * It is possible to get here with more than one pdp designated and providingservice. This normally + * It is possible to get here with more than one pdp designated and providing service. This normally * occurs when there is a race condition with multiple nodes coming up at the same time. If that is * the case we must determine which one is the one that should be designated and which one should * be demoted. @@ -490,82 +242,22 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { * the previously designated pdp. */ DroolsPdp designatedPdp = computeDesignatedPdp(listOfDesignated, mostRecentPrimary); - if (designatedPdp != null) { - pdpdNowActive = designatedPdp.getPdpId(); - } if (designatedPdp == null) { logger.warn("WARNING: DesignatedWaiter.run: No viable PDP found to be Designated. " + "designatedPdp still null."); - // Just to be sure the parameters are correctly set - myPdp.setDesignated(false); - pdpsConnector.setDesignated(myPdp,false); - isDesignated = false; - - waitTimerLastRunDate = new Date(); - logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}", - waitTimerLastRunDate); - myPdp.setUpdatedDate(waitTimerLastRunDate); - pdpsConnector.update(myPdp); - + designateNoPdp(); return; + } - } else if (designatedPdp.getPdpId().equals(myPdp.getPdpId())) { - logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId()); - /* - * update function expects myPdp.isDesignated to be true. - */ - try { - //Keep the order like this. StateManagement is last since it triggers controller init - myPdp.setDesignated(true); - myPdp.setDesignatedDate(new Date()); - pdpsConnector.setDesignated(myPdp, true); - isDesignated = true; - String standbyStatus = stateManagementFeature.getStandbyStatus(); - if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { - /* - * Only call promote if it is not already in the right state. Don't worry about - * synching the lower level topic endpoint states. That is done by the - * refreshStateAudit. - * Note that we need to fetch the session list from 'mostRecentPrimary' - * at this point -- soon, 'mostRecentPrimary' will be set to this host. - */ - //this.sessions = mostRecentPrimary.getSessions(); - stateManagementFeature.promote(); - } - } catch (Exception e) { - logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}" - + ", message=", myPdp.getPdpId(), e); - myPdp.setDesignated(false); - pdpsConnector.setDesignated(myPdp,false); - isDesignated = false; - //If you can't promote it, demote it - try { - String standbyStatus = stateManagementFeature.getStandbyStatus(); - if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) - || standbyStatus.equals(StateManagement.COLD_STANDBY))) { - /* - * Only call demote if it is not already in the right state. Don't worry about - * synching the lower level topic endpoint states. That is done by the - * refreshStateAudit. - */ - stateManagementFeature.demote(); - } - } catch (Exception e1) { - logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException " - + "attempting to promote then demote PDP={}, message=", - myPdp.getPdpId(), e1); - } - - } - waitTimerLastRunDate = new Date(); - logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) " - + "waitTimerLastRunDate = " + waitTimerLastRunDate); - myPdp.setUpdatedDate(waitTimerLastRunDate); - pdpsConnector.update(myPdp); + pdpdNowActive = designatedPdp.getPdpId(); + if (pdpdNowActive.equals(myPdp.getPdpId())) { + logger.debug("DesignatedWaiter.run: designatedPdp is PDP={}", myPdp.getPdpId()); + designateMyPdp(); return; } + isDesignated = false; } // end synchronized @@ -583,6 +275,350 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { logger.error("DesignatedWaiter.run caught an unexpected exception: ", e); } } // end run + + private void allPdpsFailed(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) { + boolean designatedPdpHasFailed = pdpsConnector.hasDesignatedPdpFailed(pdps); + logger.debug("DesignatedWaiter.run: designatedPdpHasFailed= {}", designatedPdpHasFailed); + for (DroolsPdp pdp : pdps) { + logger.debug("DesignatedWaiter.run: evaluating pdp ID: {}", pdp.getPdpId()); + + /* + * Note: side effect of isPdpCurrent is that any stale but + * designated PDPs will be marked as un-designated. + */ + boolean isCurrent = pdpsConnector.isPdpCurrent(pdp); + + /* + * We can't use stateManagement.getStandbyStatus() here, because + * we need the standbyStatus, not for this PDP, but for the PDP + * being processed by this loop iteration. + */ + String standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId()); + if (standbyStatus == null) { + // Treat this case as a cold standby -- if we + // abort here, no sessions will be created in a + // single-node test environment. + standbyStatus = StateManagement.COLD_STANDBY; + } + logger.debug("DesignatedWaiter.run: PDP= {}, isCurrent= {}", pdp.getPdpId(), isCurrent); + + adjustPdp(pdp, isCurrent, designatedPdpHasFailed, standbyStatus, listOfDesignated); + + + } // end pdps loop + } + + private void adjustPdp(DroolsPdp pdp, boolean isCurrent, boolean designatedPdpHasFailed, String standbyStatus, + List<DroolsPdp> listOfDesignated) { + /* + * There are 4 combinations of isDesignated and isCurrent. We will examine each one in-turn + * and evaluate the each pdp in the list of pdps against each combination. + */ + if (pdp.isDesignated()) { + /* + * This is the first combination of isDesignated and isCurrent + */ + if (isCurrent) { + pdpDesignatedCurrent(pdp, standbyStatus, listOfDesignated); + + /* + * The second combination of isDesignated and isCurrent + * + * PDP is designated but not current; it has failed. + * So we stand it down (it doesn't matter what + * its standbyStatus is). None of these go on the list. + */ + } else { + logger.debug("INFO: DesignatedWaiter.run: PDP= {} is currently " + + "designated but is not current; " + + "it has failed. Standing down. standbyStatus= {}", + pdp.getPdpId(), standbyStatus); + pdpDesignatedNotCurrent(pdp); + } + + } else { + // NOT designated + + + /* + * The third combination of isDesignated and isCurrent + * /* + * If a PDP is not currently designated but is providing service + * (erroneous, but recoverable) or hot standby + * we can add it to the list of possible designated if all the designated have failed + */ + if (isCurrent) { + pdpNotDesignatedCurrent(pdp, designatedPdpHasFailed, standbyStatus, + listOfDesignated); + + /* + * The fourth combination of isDesignated and isCurrent + * + * We are not going to put any of these on the list since it appears they have failed. + * + */ + } else { + logger.debug("INFO: DesignatedWaiter.run: PDP= {} " + + "designated= {}, current= {}, " + + "designatedPdpHasFailed= {}, " + + "standbyStatus= {}",pdp.getPdpId(), + pdp.isDesignated(), false, designatedPdpHasFailed, standbyStatus); + pdpNotDesignatedNotCurrent(pdp, standbyStatus); + } + } + } + + private void pdpDesignatedCurrent(DroolsPdp pdp, String standbyStatus, List<DroolsPdp> listOfDesignated) { + //It is current, but it could have a standbystatus=coldstandby / hotstandby + //If so, we need to stand it down and demote it + if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { + if (pdp.getPdpId().equals(myPdp.getPdpId())) { + logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, " + + "butstandbystatus is not providingservice. " + + " Executing stateManagement.demote()" + "\n\n", myPdp.getPdpId()); + // So, we must demote it + try { + demoteMyPdp(pdp, standbyStatus); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: myPdp: {} " + + "Caught Exception attempting to demote myPdp," + + "message= {}", myPdp.getPdpId(), e); + } + } else { + // Don't demote a remote PDP that is current. It should catch itself + logger.debug("\n\nDesignatedWaiter.run: myPdp {} is current and designated, " + + "but standbystatus is not providingservice. " + + " Cannot execute stateManagement.demote() " + + "since it it is not myPdp\n\n", + myPdp.getPdpId()); + } + + } else { + // If we get here, it is ok to be on the list + logger.debug("DesignatedWaiter.run: PDP= {} is designated, " + + "current and {} Noting PDP as " + + "designated, standbyStatus= {}", + pdp.getPdpId(), standbyStatus, standbyStatus); + listOfDesignated.add(pdp); + } + } + + private void demoteMyPdp(DroolsPdp pdp, String standbyStatus) throws Exception { + //Keep the order like this. StateManagement is last since it + //triggers controller shutdown + //This will change isDesignated and it can enter another if(combination) below + pdpsConnector.standDownPdp(pdp.getPdpId()); + myPdp.setDesignated(false); + isDesignated = false; + if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) + || standbyStatus.equals(StateManagement.COLD_STANDBY))) { + /* + * Only demote it if it appears it has not already been demoted. Don't worry + * about synching with the topic endpoint states. That is done by the + * refreshStateAudit + */ + stateManagementFeature.demote(); + } + } + + private void pdpDesignatedNotCurrent(DroolsPdp pdp) { + /* + * Changes designated to 0 but it is still potentially providing service + * Will affect isDesignated, so, it can enter an if(combination) below + */ + pdpsConnector.standDownPdp(pdp.getPdpId()); + + //need to change standbystatus to coldstandby + if (pdp.getPdpId().equals(myPdp.getPdpId())) { + logger.debug("\n\nDesignatedWaiter.run: myPdp {} is not Current. " + + " Executing stateManagement.disableFailed()\n\n", myPdp.getPdpId()); + // We found that myPdp is designated but not current + // So, we must cause it to disableFail + try { + myPdp.setDesignated(false); + pdpsConnector.setDesignated(myPdp, false); + isDesignated = false; + stateManagementFeature.disableFailed(); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception " + + "attempting to disableFail myPdp {}, message= {}", + myPdp.getPdpId(), myPdp.getPdpId(), e); + } + } else { //it is a remote PDP that is failed + logger.debug("\n\nDesignatedWaiter.run: PDP {} is not Current. " + + " Executing stateManagement.disableFailed(otherResourceName)\n\n", + pdp.getPdpId() ); + // We found a PDP is designated but not current + // We already called standdown(pdp) which will change designated to false + // Now we need to disableFail it to get its states in synch. The standbyStatus + // should equal coldstandby + try { + stateManagementFeature.disableFailed(pdp.getPdpId()); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: for PDP {} Caught Exception attempting to " + + "disableFail({}), message= {}", + pdp.getPdpId(), pdp.getPdpId(), e); + } + + } + } + + private void pdpNotDesignatedCurrent(DroolsPdp pdp, boolean designatedPdpHasFailed, String standbyStatus, + List<DroolsPdp> listOfDesignated) { + if (!(StateManagement.HOT_STANDBY.equals(standbyStatus) + || StateManagement.COLD_STANDBY.equals(standbyStatus))) { + logger.debug("\n\nDesignatedWaiter.run: PDP {}" + + " is NOT designated but IS current and" + + " has a standbystatus= {}", pdp.getPdpId(), standbyStatus); + // Since it is current, we assume it can adjust its own state. + // We will demote if it is myPdp + if (pdp.getPdpId().equals(myPdp.getPdpId())) { + //demote it + logger.debug("DesignatedWaiter.run: PDP {} going to " + + "setDesignated = false and calling stateManagement.demote", + pdp.getPdpId()); + try { + //Keep the order like this. + //StateManagement is last since it triggers controller shutdown + pdpsConnector.setDesignated(myPdp, false); + myPdp.setDesignated(false); + isDesignated = false; + //This is definitely not a redundant call. + //It is attempting to correct a problem + stateManagementFeature.demote(); + //recheck the standbystatus + standbyStatus = stateManagementFeature.getStandbyStatus(pdp.getPdpId()); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception " + + "attempting to demote myPdp {}, message = {}", myPdp.getPdpId(), + myPdp.getPdpId(), e); + } + + } + } + if (StateManagement.HOT_STANDBY.equals(standbyStatus) && designatedPdpHasFailed) { + //add it to the list + logger.debug("INFO: DesignatedWaiter.run: PDP= {}" + + " is not designated but is {} and designated PDP " + + "has failed. standbyStatus= {}", pdp.getPdpId(), + standbyStatus, standbyStatus); + listOfDesignated.add(pdp); + } + } + + private void pdpNotDesignatedNotCurrent(DroolsPdp pdp, String standbyStatus) { + if (StateManagement.COLD_STANDBY.equals(standbyStatus)) { + return; + } + + //stand it down + //disableFail it + pdpsConnector.standDownPdp(pdp.getPdpId()); + if (pdp.getPdpId().equals(myPdp.getPdpId())) { + /* + * I don't actually know how this condition could + * happen, but if it did, we would want to declare it + * failed. + */ + logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, " + + " Executing stateManagement.disableFailed()\n\n", + myPdp.getPdpId()); + // So, we must disableFail it + try { + //Keep the order like this. + //StateManagement is last since it triggers controller shutdown + pdpsConnector.setDesignated(myPdp, false); + myPdp.setDesignated(false); + isDesignated = false; + stateManagementFeature.disableFailed(); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to " + + "disableFail myPdp {}, message= {}", + myPdp.getPdpId(), myPdp.getPdpId(), e); + } + } else { //it is remote + logger.debug("\n\nDesignatedWaiter.run: myPdp {} is !current and !designated, " + + " Executing stateManagement.disableFailed({})\n\n", + myPdp.getPdpId(), pdp.getPdpId()); + // We already called standdown(pdp) which will change designated to false + // Now we need to disableFail it to get its states in sync. + // StandbyStatus = coldstandby + try { + stateManagementFeature.disableFailed(pdp.getPdpId()); + } catch (Exception e) { + logger.error("DesignatedWaiter.run: for PDP {}" + + " Caught Exception attempting to disableFail({})" + + ", message=", pdp.getPdpId(), pdp.getPdpId(), e); + } + } + } + + private void designateNoPdp() { + // Just to be sure the parameters are correctly set + myPdp.setDesignated(false); + pdpsConnector.setDesignated(myPdp,false); + isDesignated = false; + + waitTimerLastRunDate = new Date(); + logger.debug("DesignatedWaiter.run (designatedPdp == null) waitTimerLastRunDate = {}", + waitTimerLastRunDate); + myPdp.setUpdatedDate(waitTimerLastRunDate); + pdpsConnector.update(myPdp); + } + + private void designateMyPdp() { + /* + * update function expects myPdp.isDesignated to be true. + */ + try { + //Keep the order like this. StateManagement is last since it triggers controller init + myPdp.setDesignated(true); + myPdp.setDesignatedDate(new Date()); + pdpsConnector.setDesignated(myPdp, true); + isDesignated = true; + String standbyStatus = stateManagementFeature.getStandbyStatus(); + if (!standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { + /* + * Only call promote if it is not already in the right state. Don't worry about + * synching the lower level topic endpoint states. That is done by the + * refreshStateAudit. + * Note that we need to fetch the session list from 'mostRecentPrimary' + * at this point -- soon, 'mostRecentPrimary' will be set to this host. + */ + //this.sessions = mostRecentPrimary.getSessions(); + stateManagementFeature.promote(); + } + } catch (Exception e) { + logger.error("ERROR: DesignatedWaiter.run: Caught Exception attempting to promote PDP={}" + + ", message=", myPdp.getPdpId(), e); + myPdp.setDesignated(false); + pdpsConnector.setDesignated(myPdp,false); + isDesignated = false; + //If you can't promote it, demote it + try { + String standbyStatus = stateManagementFeature.getStandbyStatus(); + if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) + || standbyStatus.equals(StateManagement.COLD_STANDBY))) { + /* + * Only call demote if it is not already in the right state. Don't worry about + * synching the lower level topic endpoint states. That is done by the + * refreshStateAudit. + */ + stateManagementFeature.demote(); + } + } catch (Exception e1) { + logger.error("ERROR: DesignatedWaiter.run: Caught StandbyStatusException " + + "attempting to promote then demote PDP={}, message=", + myPdp.getPdpId(), e1); + } + + } + waitTimerLastRunDate = new Date(); + logger.debug("DesignatedWaiter.run (designatedPdp.getPdpId().equals(myPdp.getPdpId())) " + + "waitTimerLastRunDate = " + waitTimerLastRunDate); + myPdp.setUpdatedDate(waitTimerLastRunDate); + pdpsConnector.update(myPdp); + } } /** @@ -621,53 +657,27 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { * @return drools pdp object */ public DroolsPdp computeMostRecentPrimary(Collection<DroolsPdp> pdps, List<DroolsPdp> listOfDesignated) { - boolean containsDesignated = false; - for (DroolsPdp pdp : listOfDesignated) { - if (pdp.isDesignated()) { - containsDesignated = true; - } - } + boolean containsDesignated = listOfDesignated.stream().anyMatch(DroolsPdp::isDesignated); + DroolsPdp mostRecentPrimary = new DroolsPdpImpl(null, true, 1, new Date(0)); mostRecentPrimary.setSite(null); logger.debug("DesignatedWaiter.run listOfDesignated.size() = {}", listOfDesignated.size()); + if (listOfDesignated.size() <= 1) { logger.debug("DesignatedWainter.run: listOfDesignated.size <=1"); //Only one or none is designated or hot standby. Choose the latest designated date - for (DroolsPdp pdp : pdps) { - logger.debug("DesignatedWaiter.run pdp = {}" - + " pdp.getDesignatedDate() = {}", - pdp.getPdpId(), pdp.getDesignatedDate()); - if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { - mostRecentPrimary = pdp; - logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); - } - } + mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary); + } else if (listOfDesignated.size() == pdps.size()) { logger.debug("DesignatedWainter.run: listOfDesignated.size = pdps.size() which is {}", pdps.size()); //They are all designated or all hot standby. - mostRecentPrimary = null; - for (DroolsPdp pdp : pdps) { - if (mostRecentPrimary == null) { - mostRecentPrimary = pdp; - continue; - } - if (containsDesignated) { //Choose the site of the first designated date - if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) { - mostRecentPrimary = pdp; - logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); - } - } else { //Choose the site with the latest designated date - if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { - mostRecentPrimary = pdp; - logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); - } - } - } + mostRecentPrimary = getBestDesignated(pdps, containsDesignated); + } else { logger.debug("DesignatedWainter.run: Some but not all are designated or hot standby. "); + logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated); //Some but not all are designated or hot standby. if (containsDesignated) { - logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated); /* * The list only contains designated. This is a problem. It is most likely a race * condition that resulted in two thinking they should be designated. Choose the @@ -675,29 +685,66 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { * This should be the site that had the last designation before this race condition * occurred. */ - for (DroolsPdp pdp : pdps) { - if (listOfDesignated.contains(pdp)) { - continue; //Don't consider this entry - } - if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { - mostRecentPrimary = pdp; - logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); - } - } + mostRecentPrimary = getLatestUndesignated(pdps, mostRecentPrimary, listOfDesignated); + } else { - logger.debug("DesignatedWainter.run: containsDesignated = {}", containsDesignated); //The list only contains hot standby. Choose the site of the latest designated date - for (DroolsPdp pdp : pdps) { - if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { - mostRecentPrimary = pdp; - logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); - } + mostRecentPrimary = getLatestDesignated(pdps, mostRecentPrimary); + } + } + return mostRecentPrimary; + } + + private DroolsPdp getBestDesignated(Collection<DroolsPdp> pdps, boolean containsDesignated) { + DroolsPdp mostRecentPrimary; + mostRecentPrimary = null; + for (DroolsPdp pdp : pdps) { + if (mostRecentPrimary == null) { + mostRecentPrimary = pdp; + continue; + } + if (containsDesignated) { //Choose the site of the first designated date + if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) < 0) { + mostRecentPrimary = pdp; + logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); + } + } else { //Choose the site with the latest designated date + if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { + mostRecentPrimary = pdp; + logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); } } } return mostRecentPrimary; } + private DroolsPdp getLatestUndesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary, + List<DroolsPdp> listOfDesignated) { + for (DroolsPdp pdp : pdps) { + if (listOfDesignated.contains(pdp)) { + continue; //Don't consider this entry + } + if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { + mostRecentPrimary = pdp; + logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); + } + } + return mostRecentPrimary; + } + + private DroolsPdp getLatestDesignated(Collection<DroolsPdp> pdps, DroolsPdp mostRecentPrimary) { + for (DroolsPdp pdp : pdps) { + logger.debug("DesignatedWaiter.run pdp = {}" + + " pdp.getDesignatedDate() = {}", + pdp.getPdpId(), pdp.getDesignatedDate()); + if (pdp.getDesignatedDate().compareTo(mostRecentPrimary.getDesignatedDate()) > 0) { + mostRecentPrimary = pdp; + logger.debug(RUN_PRIMARY_MSG, mostRecentPrimary.getPdpId()); + } + } + return mostRecentPrimary; + } + /** * Compue designated pdp. * @@ -706,125 +753,148 @@ public class DroolsPdpsElectionHandler implements ThreadRunningChecker { * @return drools pdp object */ public DroolsPdp computeDesignatedPdp(List<DroolsPdp> listOfDesignated, DroolsPdp mostRecentPrimary) { - DroolsPdp designatedPdp = null; - DroolsPdp lowestPriorityPdp = null; - if (listOfDesignated.size() > 1) { - logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(), - listOfDesignated.size()); - DroolsPdp rejectedPdp = null; - DroolsPdp lowestPrioritySameSite = null; - DroolsPdp lowestPriorityDifferentSite = null; - for (DroolsPdp pdp : listOfDesignated) { - // We need to determine if another PDP is the lowest priority - if (nullSafeEquals(pdp.getSite(),mostRecentPrimary.getSite())) { - if (lowestPrioritySameSite == null) { - if (lowestPriorityDifferentSite != null) { - rejectedPdp = lowestPriorityDifferentSite; - } - lowestPrioritySameSite = pdp; - } else { - if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) { - continue;//nothing to compare - } - if (pdp.comparePriority(lowestPrioritySameSite) < 0) { - logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" - + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(), - lowestPrioritySameSite.getPdpId()); - //we need to reject lowestPrioritySameSite - rejectedPdp = lowestPrioritySameSite; - lowestPrioritySameSite = pdp; - } else { - //we need to reject pdp and keep lowestPrioritySameSite - logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} " - + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(), - lowestPrioritySameSite.getPdpId()); - rejectedPdp = pdp; - } - } - } else { - if (lowestPrioritySameSite != null) { - //if we already have a candidate for same site, we don't want to bother with different sites - rejectedPdp = pdp; - } else { - if (lowestPriorityDifferentSite == null) { - lowestPriorityDifferentSite = pdp; - continue; - } - if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) { - continue;//nothing to compare - } - if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) { - logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" - + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(), - lowestPriorityDifferentSite.getPdpId()); - //we need to reject lowestPriorityDifferentSite - rejectedPdp = lowestPriorityDifferentSite; - lowestPriorityDifferentSite = pdp; - } else { - //we need to reject pdp and keep lowestPriorityDifferentSite - logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" - + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(), - lowestPriorityDifferentSite.getPdpId()); - rejectedPdp = pdp; - } - } + if (listOfDesignated.isEmpty()) { + logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId()); + return null; + } + + if (listOfDesignated.size() == 1) { + logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated " + + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId()); + return listOfDesignated.get(0); + } + + logger.debug("DesignatedWaiter.run: myPdp: {} listOfDesignated.size(): {}", myPdp.getPdpId(), + listOfDesignated.size()); + DesignatedData data = new DesignatedData(); + for (DroolsPdp pdp : listOfDesignated) { + DroolsPdp rejectedPdp; + + // We need to determine if another PDP is the lowest priority + if (nullSafeEquals(pdp.getSite(), mostRecentPrimary.getSite())) { + rejectedPdp = data.compareSameSite(pdp); + } else { + rejectedPdp = data.compareDifferentSite(pdp); + } + // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible + // for demoting itself + if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) { + logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}" + + " is NOT the lowest priority. Executing stateManagement.demote()\n\n", + myPdp.getPdpId(), + myPdp.getPdpId()); + // We found that myPdp is on the listOfDesignated and it is not the lowest priority + // So, we must demote it + demoteMyPdp(); + } + } //end: for(DroolsPdp pdp : listOfDesignated) + + DroolsPdp lowestPriorityPdp = data.getLowestPriority(); + + //now we have a valid value for lowestPriorityPdp + logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated " + + "found the LOWEST priority pdp ID: {} " + + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n", + myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp); + return lowestPriorityPdp; + + } + + private class DesignatedData { + private DroolsPdp lowestPrioritySameSite = null; + private DroolsPdp lowestPriorityDifferentSite = null; + + private DroolsPdp compareSameSite(DroolsPdp pdp) { + if (lowestPrioritySameSite == null) { + if (lowestPriorityDifferentSite != null) { + //we need to reject lowestPriorityDifferentSite + DroolsPdp rejectedPdp = lowestPriorityDifferentSite; + lowestPriorityDifferentSite = pdp; + return rejectedPdp; } - // If the rejectedPdp is myPdp, we need to stand it down and demote it. Each pdp is responsible - // for demoting itself - if (rejectedPdp != null && nullSafeEquals(rejectedPdp.getPdpId(),myPdp.getPdpId())) { - logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated myPdp ID: {}" - + " is NOT the lowest priority. Executing stateManagement.demote()\n\n", - myPdp.getPdpId(), - myPdp.getPdpId()); - // We found that myPdp is on the listOfDesignated and it is not the lowest priority - // So, we must demote it - try { - //Keep the order like this. StateManagement is last since it triggers controller shutdown - myPdp.setDesignated(false); - pdpsConnector.setDesignated(myPdp, false); - isDesignated = false; - String standbyStatus = stateManagementFeature.getStandbyStatus(); - if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) - || standbyStatus.equals(StateManagement.COLD_STANDBY))) { - /* - * Only call demote if it is not already in the right state. Don't worry about - * synching the lower level topic endpoint states. That is done by the - * refreshStateAudit. - */ - stateManagementFeature.demote(); - } - } catch (Exception e) { - myPdp.setDesignated(false); - pdpsConnector.setDesignated(myPdp, false); - isDesignated = false; - logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to " - + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(), - e); - } + lowestPrioritySameSite = pdp; + return null; + } else { + if (pdp.getPdpId().equals((lowestPrioritySameSite.getPdpId()))) { + return null;//nothing to compare } - } //end: for(DroolsPdp pdp : listOfDesignated) + if (pdp.comparePriority(lowestPrioritySameSite) < 0) { + logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" + + " has lower priority than pdp ID: {}",myPdp.getPdpId(), pdp.getPdpId(), + lowestPrioritySameSite.getPdpId()); + //we need to reject lowestPrioritySameSite + DroolsPdp rejectedPdp = lowestPrioritySameSite; + lowestPrioritySameSite = pdp; + return rejectedPdp; + } else { + //we need to reject pdp and keep lowestPrioritySameSite + logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {} " + + " has higher priority than pdp ID: {}", myPdp.getPdpId(),pdp.getPdpId(), + lowestPrioritySameSite.getPdpId()); + return pdp; + } + } + } + + private DroolsPdp compareDifferentSite(DroolsPdp pdp) { if (lowestPrioritySameSite != null) { - lowestPriorityPdp = lowestPrioritySameSite; + //if we already have a candidate for same site, we don't want to bother with different sites + return pdp; } else { - lowestPriorityPdp = lowestPriorityDifferentSite; + if (lowestPriorityDifferentSite == null) { + lowestPriorityDifferentSite = pdp; + return null; + } + if (pdp.getPdpId().equals((lowestPriorityDifferentSite.getPdpId()))) { + return null;//nothing to compare + } + if (pdp.comparePriority(lowestPriorityDifferentSite) < 0) { + logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" + + " has lower priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(), + lowestPriorityDifferentSite.getPdpId()); + //we need to reject lowestPriorityDifferentSite + DroolsPdp rejectedPdp = lowestPriorityDifferentSite; + lowestPriorityDifferentSite = pdp; + return rejectedPdp; + } else { + //we need to reject pdp and keep lowestPriorityDifferentSite + logger.debug("\nDesignatedWaiter.run: myPdp {} listOfDesignated pdp ID: {}" + + " has higher priority than pdp ID: {}", myPdp.getPdpId(), pdp.getPdpId(), + lowestPriorityDifferentSite.getPdpId()); + return pdp; + } } - //now we have a valid value for lowestPriorityPdp - logger.debug("\n\nDesignatedWaiter.run: myPdp: {} listOfDesignated " - + "found the LOWEST priority pdp ID: {} " - + " It is now the designatedPpd from the perspective of myPdp ID: {} \n\n", - myPdp.getPdpId(), lowestPriorityPdp.getPdpId(), myPdp); - designatedPdp = lowestPriorityPdp; - - } else if (listOfDesignated.isEmpty()) { - logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated is: EMPTY.", myPdp.getPdpId()); - designatedPdp = null; - } else { //only one in listOfDesignated - logger.debug("\nDesignatedWaiter.run: myPdp: {} listOfDesignated " - + "has ONE entry. PDP ID: {}", myPdp.getPdpId(), listOfDesignated.get(0).getPdpId()); - designatedPdp = listOfDesignated.get(0); } - return designatedPdp; + private DroolsPdp getLowestPriority() { + return (lowestPrioritySameSite != null ? lowestPrioritySameSite : lowestPriorityDifferentSite); + } + } + + private void demoteMyPdp() { + try { + //Keep the order like this. StateManagement is last since it triggers controller shutdown + myPdp.setDesignated(false); + pdpsConnector.setDesignated(myPdp, false); + isDesignated = false; + String standbyStatus = stateManagementFeature.getStandbyStatus(); + if (!(standbyStatus.equals(StateManagement.HOT_STANDBY) + || standbyStatus.equals(StateManagement.COLD_STANDBY))) { + /* + * Only call demote if it is not already in the right state. Don't worry about + * synching the lower level topic endpoint states. That is done by the + * refreshStateAudit. + */ + stateManagementFeature.demote(); + } + } catch (Exception e) { + myPdp.setDesignated(false); + pdpsConnector.setDesignated(myPdp, false); + isDesignated = false; + logger.error("DesignatedWaiter.run: myPdp: {} Caught Exception attempting to " + + "demote myPdp {} myPdp.getPdpId(), message= {}", myPdp.getPdpId(), + e); + } } private class TimerUpdateClass extends TimerTask { diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java index 078c891f..ed53f55c 100644 --- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java +++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/JpaDroolsPdpsConnector.java @@ -61,18 +61,19 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector { .setFlushMode(FlushModeType.COMMIT).getResultList(); LinkedList<DroolsPdp> droolsPdpsReturnList = new LinkedList<>(); for (Object o : droolsPdpsList) { - if (o instanceof DroolsPdp) { - //Make sure it is not a cached version - em.refresh((DroolsPdpEntity)o); - droolsPdpsReturnList.add((DroolsPdp)o); - if (logger.isDebugEnabled()) { - DroolsPdp droolsPdp = (DroolsPdp)o; - logger.debug("getDroolsPdps: PDP= {}" - + ", isDesignated= {}" - + ", updatedDate= {}" - + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(), - droolsPdp.getUpdatedDate(), droolsPdp.getPriority()); - } + if (!(o instanceof DroolsPdp)) { + continue; + } + //Make sure it is not a cached version + DroolsPdp droolsPdp = (DroolsPdp)o; + em.refresh(droolsPdp); + droolsPdpsReturnList.add(droolsPdp); + if (logger.isDebugEnabled()) { + logger.debug("getDroolsPdps: PDP= {}" + + ", isDesignated= {}" + + ", updatedDate= {}" + + ", priority= {}", droolsPdp.getPdpId(), droolsPdp.isDesignated(), + droolsPdp.getUpdatedDate(), droolsPdp.getPriority()); } } try { @@ -233,14 +234,7 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector { + " found, designated= {}" + ", setting to {}", pdp.getPdpId(), droolsPdpEntity.isDesignated(), designated); - droolsPdpEntity.setDesignated(designated); - if (designated) { - em.refresh(droolsPdpEntity); //make sure we get the DB value - if (!droolsPdpEntity.isDesignated()) { - droolsPdpEntity.setDesignatedDate(new Date()); - } - - } + setPdpDesignation(em, droolsPdpEntity, designated); em.getTransaction().commit(); } else { logger.error("setDesignated: PDP={}" @@ -256,6 +250,17 @@ public class JpaDroolsPdpsConnector implements DroolsPdpsConnector { } + private void setPdpDesignation(EntityManager em, DroolsPdpEntity droolsPdpEntity, boolean designated) { + droolsPdpEntity.setDesignated(designated); + if (designated) { + em.refresh(droolsPdpEntity); //make sure we get the DB value + if (!droolsPdpEntity.isDesignated()) { + droolsPdpEntity.setDesignatedDate(new Date()); + } + + } + } + @Override public void standDownPdp(String pdpId) { diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java index 3f4ae557..7669cc22 100644 --- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java +++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PmStandbyStateChangeNotifier.java @@ -125,138 +125,163 @@ public class PmStandbyStateChangeNotifier extends StateChangeNotifier { if (standbyStatus == null || standbyStatus.equals(StateManagement.NULL_VALUE)) { logger.debug("handleStateChange: standbyStatus is null; standing down PDP={}", pdpId); - if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) { - // We were just here and did this successfully - logger.debug("handleStateChange: " - + "Is returning because standbyStatus is null and was previously 'null'; PDP={}", - pdpId); - return; - } - isWaitingForActivation = false; - try { - logger.debug("handleStateChange: null: cancelling delayActivationTimer."); - cancelTimer(); - // Only want to lock the endpoints, not the controllers. - getPolicyEngineManager().deactivate(); - // The operation was fully successful, but you cannot assign it a real null value - // because later we might try to execute previousStandbyStatus.equals() and get - // a null pointer exception. - previousStandbyStatus = StateManagement.NULL_VALUE; - } catch (Exception e) { - logger.warn("handleStateChange: standbyStatus == null caught exception: ", e); - } + standDownPdpNull(pdpId); + } else if (standbyStatus.equals(StateManagement.HOT_STANDBY) || standbyStatus.equals(StateManagement.COLD_STANDBY)) { logger.debug("handleStateChange: standbyStatus={}; standing down PDP={}", standbyStatus, pdpId); - if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) { - // We were just here and did this successfully - logger.debug("handleStateChange: Is returning because standbyStatus is {}" - + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId); - return; - } - isWaitingForActivation = false; - try { - logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer."); - cancelTimer(); - // Only want to lock the endpoints, not the controllers. - getPolicyEngineManager().deactivate(); - // The operation was fully successful - previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY; - } catch (Exception e) { - logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(), - e); - } + standDownPdp(pdpId, standbyStatus); } else if (standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { logger.debug("handleStateChange: standbyStatus= {} scheduling activation of PDP={}", standbyStatus, pdpId); - if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { - // We were just here and did this successfully - logger.debug("handleStateChange: Is returning because standbyStatus is {}" - + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId); - return; - } - try { - // UnLock all the endpoints - logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus); - /* - * Only endpoints should be unlocked. Controllers have not been locked. Because, - * sometimes, it is possible for more than one PDP-D to become active (race - * conditions) we need to delay the activation of the topic endpoint interfaces to - * give the election algorithm time to resolve the conflict. - */ - logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}", - isWaitingForActivation); - - // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler - // conflict. - // You could have multiple election handlers thinking they can take over. - - // First let's check that the timer has not died - if (isWaitingForActivation) { - logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}", - isWaitingForActivation); - long now = new Date().getTime(); - long waitTimeMs = now - startTimeWaitingForActivationMs; - if (waitTimeMs > 3 * waitInterval) { - logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer " - + "may be hung, waitTimeMs = {} and allowable waitInterval = {}" - + " Checking whether it is currently in activation. isNowActivating = {}", - waitTimeMs, waitInterval, isNowActivating); - // Now check that it is not currently executing an activation - if (!isNowActivating) { - logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation " - + "wait timer died"); - // This will assure the timer is cancelled and rescheduled. - isWaitingForActivation = false; - } - } + schedulePdpActivation(pdpId, standbyStatus); - } + } else { + logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId); + standDownPdpUnsupported(pdpId, standbyStatus); + } - if (!isWaitingForActivation) { - // Just in case there is an old timer hanging around - logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer."); - cancelTimer(); - delayActivateTimer = makeTimer(); - // delay the activate so the DesignatedWaiter can run twice - delayActivateTimer.schedule(new DelayActivateClass(), waitInterval); - isWaitingForActivation = true; - startTimeWaitingForActivationMs = new Date().getTime(); - logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms", - waitInterval); - } else { - logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is " - + "waiting for activation."); - } + logger.debug("handleStateChange: Exiting"); + } - } catch (Exception e) { - logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ", - e); - } + private void standDownPdpNull(String pdpId) { + if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) { + // We were just here and did this successfully + logger.debug("handleStateChange: " + + "Is returning because standbyStatus is null and was previously 'null'; PDP={}", + pdpId); + return; + } - } else { - logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId); - if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) { - // We were just here and did this successfully - logger.debug("handleStateChange: Is returning because standbyStatus is " - + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId); - return; - } + isWaitingForActivation = false; + try { + logger.debug("handleStateChange: null: cancelling delayActivationTimer."); + cancelTimer(); // Only want to lock the endpoints, not the controllers. - isWaitingForActivation = false; - try { - logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer."); + getPolicyEngineManager().deactivate(); + // The operation was fully successful, but you cannot assign it a real null value + // because later we might try to execute previousStandbyStatus.equals() and get + // a null pointer exception. + previousStandbyStatus = StateManagement.NULL_VALUE; + } catch (Exception e) { + logger.warn("handleStateChange: standbyStatus == null caught exception: ", e); + } + } + + private void standDownPdp(String pdpId, String standbyStatus) { + if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) { + // We were just here and did this successfully + logger.debug("handleStateChange: Is returning because standbyStatus is {}" + + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId); + return; + } + + isWaitingForActivation = false; + try { + logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer."); + cancelTimer(); + // Only want to lock the endpoints, not the controllers. + getPolicyEngineManager().deactivate(); + // The operation was fully successful + previousStandbyStatus = PmStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY; + } catch (Exception e) { + logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(), + e); + } + } + + private void schedulePdpActivation(String pdpId, String standbyStatus) { + if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) { + // We were just here and did this successfully + logger.debug("handleStateChange: Is returning because standbyStatus is {}" + + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId); + return; + } + + try { + // UnLock all the endpoints + logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus); + /* + * Only endpoints should be unlocked. Controllers have not been locked. Because, + * sometimes, it is possible for more than one PDP-D to become active (race + * conditions) we need to delay the activation of the topic endpoint interfaces to + * give the election algorithm time to resolve the conflict. + */ + logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}", + isWaitingForActivation); + + // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler + // conflict. + // You could have multiple election handlers thinking they can take over. + + // First let's check that the timer has not died + checkTimerStatus(); + + if (!isWaitingForActivation) { + // Just in case there is an old timer hanging around + logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer."); cancelTimer(); - getPolicyEngineManager().deactivate(); - // We know the standbystatus is unsupported - previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED; - } catch (Exception e) { - logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ", - standbyStatus, e.getMessage(), e); + delayActivateTimer = makeTimer(); + // delay the activate so the DesignatedWaiter can run twice + delayActivateTimer.schedule(new DelayActivateClass(), waitInterval); + isWaitingForActivation = true; + startTimeWaitingForActivationMs = new Date().getTime(); + logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms", + waitInterval); + } else { + logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is " + + "waiting for activation."); } + + } catch (Exception e) { + logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ", + e); + } + } + + private void checkTimerStatus() { + if (isWaitingForActivation) { + logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}", + isWaitingForActivation); + long now = new Date().getTime(); + long waitTimeMs = now - startTimeWaitingForActivationMs; + if (waitTimeMs > 3 * waitInterval) { + logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer " + + "may be hung, waitTimeMs = {} and allowable waitInterval = {}" + + " Checking whether it is currently in activation. isNowActivating = {}", + waitTimeMs, waitInterval, isNowActivating); + // Now check that it is not currently executing an activation + if (!isNowActivating) { + logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation " + + "wait timer died"); + // This will assure the timer is cancelled and rescheduled. + isWaitingForActivation = false; + } + } + } + } + + private void standDownPdpUnsupported(String pdpId, String standbyStatus) { + if (previousStandbyStatus.equals(PmStandbyStateChangeNotifier.UNSUPPORTED)) { + // We were just here and did this successfully + logger.debug("handleStateChange: Is returning because standbyStatus is " + + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId); + return; + } + + // Only want to lock the endpoints, not the controllers. + isWaitingForActivation = false; + try { + logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer."); + cancelTimer(); + getPolicyEngineManager().deactivate(); + // We know the standbystatus is unsupported + previousStandbyStatus = PmStandbyStateChangeNotifier.UNSUPPORTED; + } catch (Exception e) { + logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ", + standbyStatus, e.getMessage(), e); } - logger.debug("handleStateChange: Exiting"); } private void cancelTimer() { diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java index 8b653aae..dd0fb7b8 100644 --- a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java +++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRs.java @@ -99,7 +99,7 @@ public class DMaaPSimulatorJaxRs { return response; } - private String waitForNextMessageFromQueue(int timeout, String topicName) { + protected String waitForNextMessageFromQueue(int timeout, String topicName) { try { sleep(timeout); if (queues.containsKey(topicName)) { @@ -129,10 +129,7 @@ public class DMaaPSimulatorJaxRs { @Consumes(MediaType.TEXT_PLAIN) public String publish(@PathParam("topicName") String topicName, String body) { BlockingQueue<String> queue = queues.computeIfAbsent(topicName, entry -> new LinkedBlockingQueue<>()); - - if (!queue.offer(body)) { - logger.warn("error on topic {}, failed to place body {} on queue", topicName, body); - } + queue.offer(body); return ""; } diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java index 7200bdce..b1275004 100644 --- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java +++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorJaxRsTest.java @@ -21,6 +21,7 @@ package org.onap.policy.drools.simulators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import javax.servlet.http.HttpServletResponse; import org.junit.After; import org.junit.Before; @@ -130,6 +132,16 @@ public class DMaaPSimulatorJaxRsTest { @Test public void testWaitForNextMessageFromQueue() throws InterruptedException { + CountDownLatch waitCalled = new CountDownLatch(1); + + sim = new DMaaPSimulatorJaxRs() { + @Override + protected String waitForNextMessageFromQueue(int timeout, String topicName) { + waitCalled.countDown(); + return super.waitForNextMessageFromQueue(timeout, topicName); + } + }; + BlockingQueue<String> queue = new LinkedBlockingQueue<>(); CountDownLatch latch1 = backgroundSubscribe(queue); @@ -143,7 +155,7 @@ public class DMaaPSimulatorJaxRsTest { * Must pause to prevent the topic from being created before subscribe() is * invoked. */ - Thread.sleep(LONG_TIMEOUT_MS / 3); + assertTrue(waitCalled.await(1, TimeUnit.SECONDS)); // only publish one message sim.publish(TOPIC, MESSAGE); diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java index 9d39a7bf..dc0f3784 100644 --- a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java @@ -30,9 +30,9 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,96 +73,172 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase { throws IOException, InterruptedException { logger.debug("Running 'RepositoryAudit.invoke'"); - boolean isActive = true; - // ignore errors by default - boolean ignoreErrors = true; - String repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active"); - String repoAuditIgnoreErrors = - StateManagementProperties.getProperty("repository.audit.ignore.errors"); + InvokeData data = new InvokeData(); + logger.debug("RepositoryAudit.invoke: repoAuditIsActive = {}" - + ", repoAuditIgnoreErrors = {}",repoAuditIsActive, repoAuditIgnoreErrors); - - if (repoAuditIsActive != null) { - try { - isActive = Boolean.parseBoolean(repoAuditIsActive.trim()); - } catch (NumberFormatException e) { - logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}", - repoAuditIsActive); - } - } + + ", repoAuditIgnoreErrors = {}",data.repoAuditIsActive, data.repoAuditIgnoreErrors); - if (!isActive) { - logger.info("RepositoryAudit.invoke: exiting because isActive = {}", isActive); + data.initIsActive(); + + if (!data.isActive) { + logger.info("RepositoryAudit.invoke: exiting because isActive = {}", data.isActive); return; } - if (repoAuditIgnoreErrors != null) { - try { - ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim()); - } catch (NumberFormatException e) { - ignoreErrors = true; - logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}", - repoAuditIgnoreErrors); - } - } else { - ignoreErrors = true; - } + data.initIgnoreErrors(); + data.initTimeout(); - // Fetch repository information from 'IntegrityMonitorProperties' - String repositoryId = - StateManagementProperties.getProperty("repository.audit.id"); - String repositoryUrl = - StateManagementProperties.getProperty("repository.audit.url"); - String repositoryUsername = - StateManagementProperties.getProperty("repository.audit.username"); - String repositoryPassword = - StateManagementProperties.getProperty("repository.audit.password"); - boolean upload = - repositoryId != null && repositoryUrl != null - && repositoryUsername != null && repositoryPassword != null; + /* + * 1) create temporary directory + */ + data.dir = Files.createTempDirectory("auditRepo"); + logger.info("RepositoryAudit: temporary directory = {}", data.dir); - // used to incrementally construct response as problems occur - // (empty = no problems) - StringBuilder response = new StringBuilder(); - - long timeoutInSeconds = DEFAULT_TIMEOUT; - String timeoutString = - StateManagementProperties.getProperty("repository.audit.timeout"); - if (timeoutString != null && !timeoutString.isEmpty()) { - try { - timeoutInSeconds = Long.valueOf(timeoutString); - } catch (NumberFormatException e) { - logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'", - timeoutString, e); - if (!ignoreErrors) { - response.append("Invalid 'repository.audit.timeout' value: '") - .append(timeoutString).append("'\n"); - setResponse(response.toString()); - } - } + // nested 'pom.xml' file and 'repo' directory + final Path pom = data.dir.resolve("pom.xml"); + final Path repo = data.dir.resolve("repo"); + + /* + * 2) Create test file, and upload to repository + * (only if repository information is specified) + */ + if (data.upload) { + data.uploadTestFile(); } - // artifacts to be downloaded - LinkedList<Artifact> artifacts = new LinkedList<>(); + /* + * 3) create 'pom.xml' file in temporary directory + */ + data.createPomFile(repo, pom); /* - * 1) create temporary directory + * 4) Invoke external 'mvn' process to do the downloads */ - Path dir = Files.createTempDirectory("auditRepo"); - logger.info("RepositoryAudit: temporary directory = {}", dir); - // nested 'pom.xml' file and 'repo' directory - final Path pom = dir.resolve("pom.xml"); - final Path repo = dir.resolve("repo"); + // output file = ${dir}/out (this supports step '4a') + File output = data.dir.resolve("out").toFile(); + + // invoke process, and wait for response + int rval = data.runMaven(output); /* - * 2) Create test file, and upload to repository + * 4a) Check attempted and successful downloads from output file + * Note: at present, this step just generates log messages, + * but doesn't do any verification. + */ + if (rval == 0 && output != null) { + generateDownloadLogs(output); + } + + /* + * 5) Check the contents of the directory to make sure the downloads + * were successful + */ + data.verifyDownloads(repo); + + /* + * 6) Use 'curl' to delete the uploaded test file * (only if repository information is specified) */ - String groupId = null; - String artifactId = null; - String version = null; - if (upload) { + if (data.upload) { + data.deleteUploadedTestFile(); + } + + /* + * 7) Remove the temporary directory + */ + Files.walkFileTree(data.dir, new RecursivelyDeleteDirectory()); + } + + private class InvokeData { + private boolean isActive = true; + + // ignore errors by default + private boolean ignoreErrors = true; + + private final String repoAuditIsActive; + private final String repoAuditIgnoreErrors; + + private final String repositoryId; + private final String repositoryUrl; + private final String repositoryUsername; + private final String repositoryPassword; + private final boolean upload; + + // used to incrementally construct response as problems occur + // (empty = no problems) + private final StringBuilder response = new StringBuilder(); + + private long timeoutInSeconds = DEFAULT_TIMEOUT; + + private Path dir; + + private String groupId = null; + private String artifactId = null; + private String version = null; + + // artifacts to be downloaded + private final List<Artifact> artifacts = new LinkedList<>(); + + public InvokeData() { + repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active"); + repoAuditIgnoreErrors = StateManagementProperties.getProperty("repository.audit.ignore.errors"); + + // Fetch repository information from 'IntegrityMonitorProperties' + repositoryId = StateManagementProperties.getProperty("repository.audit.id"); + repositoryUrl = StateManagementProperties.getProperty("repository.audit.url"); + repositoryUsername = StateManagementProperties.getProperty("repository.audit.username"); + repositoryPassword = StateManagementProperties.getProperty("repository.audit.password"); + + upload = repositoryId != null && repositoryUrl != null + && repositoryUsername != null && repositoryPassword != null; + } + + public void initIsActive() { + if (repoAuditIsActive != null) { + try { + isActive = Boolean.parseBoolean(repoAuditIsActive.trim()); + } catch (NumberFormatException e) { + logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}", + repoAuditIsActive); + } + } + } + + public void initIgnoreErrors() { + if (repoAuditIgnoreErrors != null) { + try { + ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim()); + } catch (NumberFormatException e) { + ignoreErrors = true; + logger.warn( + "RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}", + repoAuditIgnoreErrors); + } + } else { + ignoreErrors = true; + } + } + + public void initTimeout() { + String timeoutString = + StateManagementProperties.getProperty("repository.audit.timeout"); + if (timeoutString != null && !timeoutString.isEmpty()) { + try { + timeoutInSeconds = Long.valueOf(timeoutString); + } catch (NumberFormatException e) { + logger.error("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'", + timeoutString, e); + if (!ignoreErrors) { + response.append("Invalid 'repository.audit.timeout' value: '") + .append(timeoutString).append("'\n"); + setResponse(response.toString()); + } + } + } + } + + private void uploadTestFile() throws IOException, InterruptedException { groupId = "org.onap.policy.audit"; artifactId = "repository-audit"; version = "0." + System.currentTimeMillis(); @@ -204,155 +280,105 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase { } } - /* - * 3) create 'pom.xml' file in temporary directory - */ - artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2")); - - StringBuilder sb = new StringBuilder(); - sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" - + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n" - + "\n" - + " <modelVersion>4.0.0</modelVersion>\n" - + " <groupId>empty</groupId>\n" - + " <artifactId>empty</artifactId>\n" - + " <version>1.0-SNAPSHOT</version>\n" - + " <packaging>pom</packaging>\n" - + "\n" - + " <build>\n" - + " <plugins>\n" - + " <plugin>\n" - + " <groupId>org.apache.maven.plugins</groupId>\n" - + " <artifactId>maven-dependency-plugin</artifactId>\n" - + " <version>2.10</version>\n" - + " <executions>\n" - + " <execution>\n" - + " <id>copy</id>\n" - + " <goals>\n" - + " <goal>copy</goal>\n" - + " </goals>\n" - + " <configuration>\n" - + " <localRepositoryDirectory>") - .append(repo) - .append("</localRepositoryDirectory>\n") - .append(" <artifactItems>\n"); - - for (Artifact artifact : artifacts) { - // each artifact results in an 'artifactItem' element - sb.append(" <artifactItem>\n" - + " <groupId>") - .append(artifact.groupId) - .append("</groupId>\n" - + " <artifactId>") - .append(artifact.artifactId) - .append("</artifactId>\n" - + " <version>") - .append(artifact.version) - .append("</version>\n" - + " <type>") - .append(artifact.type) - .append("</type>\n" - + " </artifactItem>\n"); - } - sb.append(" </artifactItems>\n" - + " </configuration>\n" - + " </execution>\n" - + " </executions>\n" - + " </plugin>\n" - + " </plugins>\n" - + " </build>\n" - + "</project>\n"); - - try (FileOutputStream fos = new FileOutputStream(pom.toFile())) { - fos.write(sb.toString().getBytes()); - } - - /* - * 4) Invoke external 'mvn' process to do the downloads - */ - - // output file = ${dir}/out (this supports step '4a') - File output = dir.resolve("out").toFile(); - - // invoke process, and wait for response - int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile"); - logger.info("RepositoryAudit: 'mvn' return value = {}", rval); - if (rval != 0) { - logger.error("RepositoryAudit: 'mvn compile' invocation failed"); - if (!ignoreErrors) { - response.append("'mvn compile' invocation failed\n"); - setResponse(response.toString()); + private void createPomFile(final Path repo, final Path pom) + throws IOException { + + artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2")); + + StringBuilder sb = new StringBuilder(); + sb.append("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" + + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n" + + "\n" + + " <modelVersion>4.0.0</modelVersion>\n" + + " <groupId>empty</groupId>\n" + + " <artifactId>empty</artifactId>\n" + + " <version>1.0-SNAPSHOT</version>\n" + + " <packaging>pom</packaging>\n" + + "\n" + + " <build>\n" + + " <plugins>\n" + + " <plugin>\n" + + " <groupId>org.apache.maven.plugins</groupId>\n" + + " <artifactId>maven-dependency-plugin</artifactId>\n" + + " <version>2.10</version>\n" + + " <executions>\n" + + " <execution>\n" + + " <id>copy</id>\n" + + " <goals>\n" + + " <goal>copy</goal>\n" + + " </goals>\n" + + " <configuration>\n" + + " <localRepositoryDirectory>") + .append(repo) + .append("</localRepositoryDirectory>\n") + .append(" <artifactItems>\n"); + + for (Artifact artifact : artifacts) { + // each artifact results in an 'artifactItem' element + sb.append(" <artifactItem>\n" + + " <groupId>") + .append(artifact.groupId) + .append("</groupId>\n" + + " <artifactId>") + .append(artifact.artifactId) + .append("</artifactId>\n" + + " <version>") + .append(artifact.version) + .append("</version>\n" + + " <type>") + .append(artifact.type) + .append("</type>\n" + + " </artifactItem>\n"); } - } - - /* - * 4a) Check attempted and successful downloads from output file - * Note: at present, this step just generates log messages, - * but doesn't do any verification. - */ - if (rval == 0 && output != null) { - // place output in 'fileContents' (replacing the Return characters - // with Newline) - byte[] outputData = new byte[(int)output.length()]; - String fileContents; - try (FileInputStream fis = new FileInputStream(output)) { - // - // Ideally this should be in a loop or even better use - // Java 8 nio functionality. - // - int bytesRead = fis.read(outputData); - logger.info("fileContents read {} bytes", bytesRead); - fileContents = new String(outputData).replace('\r','\n'); + sb.append(" </artifactItems>\n" + + " </configuration>\n" + + " </execution>\n" + + " </executions>\n" + + " </plugin>\n" + + " </plugins>\n" + + " </build>\n" + + "</project>\n"); + + try (FileOutputStream fos = new FileOutputStream(pom.toFile())) { + fos.write(sb.toString().getBytes()); } + } - // generate log messages from 'Downloading' and 'Downloaded' - // messages within the 'mvn' output - int index = 0; - while ((index = fileContents.indexOf("\nDown", index)) > 0) { - index += 5; - if (fileContents.regionMatches(index, "loading: ", 0, 9)) { - index += 9; - int endIndex = fileContents.indexOf('\n', index); - logger.info("RepositoryAudit: Attempted download: '{}'", - fileContents.substring(index, endIndex)); - index = endIndex; - } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) { - index += 8; - int endIndex = fileContents.indexOf(' ', index); - logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex)); - index = endIndex; + private int runMaven(File output) throws IOException, InterruptedException { + int rval = runProcess(timeoutInSeconds, dir.toFile(), output, "mvn", "compile"); + logger.info("RepositoryAudit: 'mvn' return value = {}", rval); + if (rval != 0) { + logger.error("RepositoryAudit: 'mvn compile' invocation failed"); + if (!ignoreErrors) { + response.append("'mvn compile' invocation failed\n"); + setResponse(response.toString()); } } + return rval; } - /* - * 5) Check the contents of the directory to make sure the downloads - * were successful - */ - for (Artifact artifact : artifacts) { - if (repo.resolve(artifact.groupId.replace('.','/')) - .resolve(artifact.artifactId) - .resolve(artifact.version) - .resolve(artifact.artifactId + "-" + artifact.version + "." - + artifact.type).toFile().exists()) { - // artifact exists, as expected - logger.info("RepositoryAudit: {} : exists", artifact.toString()); - } else { - // Audit ERROR: artifact download failed for some reason - logger.error("RepositoryAudit: {}: does not exist", artifact.toString()); - if (!ignoreErrors) { - response.append("Failed to download artifact: ") - .append(artifact).append('\n'); - setResponse(response.toString()); + private void verifyDownloads(final Path repo) { + for (Artifact artifact : artifacts) { + if (repo.resolve(artifact.groupId.replace('.','/')) + .resolve(artifact.artifactId) + .resolve(artifact.version) + .resolve(artifact.artifactId + "-" + artifact.version + "." + + artifact.type).toFile().exists()) { + // artifact exists, as expected + logger.info("RepositoryAudit: {} : exists", artifact.toString()); + } else { + // Audit ERROR: artifact download failed for some reason + logger.error("RepositoryAudit: {}: does not exist", artifact.toString()); + if (!ignoreErrors) { + response.append("Failed to download artifact: ") + .append(artifact).append('\n'); + setResponse(response.toString()); + } } } } - /* - * 6) Use 'curl' to delete the uploaded test file - * (only if repository information is specified) - */ - if (upload) { + private void deleteUploadedTestFile() throws IOException, InterruptedException { if (runProcess(timeoutInSeconds, dir.toFile(), null, "curl", "--request", "DELETE", @@ -370,11 +396,41 @@ public class RepositoryAudit extends DroolsPdpIntegrityMonitor.AuditBase { artifacts.add(new Artifact(groupId, artifactId, version, "txt")); } } + } - /* - * 7) Remove the temporary directory - */ - Files.walkFileTree(dir, new RecursivelyDeleteDirectory()); + private void generateDownloadLogs(File output) throws IOException { + // place output in 'fileContents' (replacing the Return characters + // with Newline) + byte[] outputData = new byte[(int)output.length()]; + String fileContents; + try (FileInputStream fis = new FileInputStream(output)) { + // + // Ideally this should be in a loop or even better use + // Java 8 nio functionality. + // + int bytesRead = fis.read(outputData); + logger.info("fileContents read {} bytes", bytesRead); + fileContents = new String(outputData).replace('\r','\n'); + } + + // generate log messages from 'Downloading' and 'Downloaded' + // messages within the 'mvn' output + int index = 0; + while ((index = fileContents.indexOf("\nDown", index)) > 0) { + index += 5; + if (fileContents.regionMatches(index, "loading: ", 0, 9)) { + index += 9; + int endIndex = fileContents.indexOf('\n', index); + logger.info("RepositoryAudit: Attempted download: '{}'", + fileContents.substring(index, endIndex)); + index = endIndex; + } else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) { + index += 8; + int endIndex = fileContents.indexOf(' ', index); + logger.info("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex)); + index = endIndex; + } + } } /** diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java index 584b3845..bd97ddb1 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java @@ -448,24 +448,30 @@ public class PolicyContainer implements Startable { */ public synchronized void startScanner(ReleaseId releaseId) { String version = releaseId.getVersion(); - if (!scannerStarted && scanner == null && version != null - && ("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) { - // create the scanner, and poll at 60 second intervals - try { - scannerStarted = true; - - // start this in a separate thread -- it can block for a long time - new Thread("Scanner Starter " + getName()) { - @Override - public void run() { - scanner = kieServices.newKieScanner(kieContainer); - scanner.start(60000L); - } - }.start(); - } catch (Exception e) { - // sometimes the scanner initialization fails for some reason - logger.error("startScanner error", e); - } + + if (scannerStarted || scanner != null || version == null) { + return; + } + + if (!("LATEST".equals(version) || "RELEASE".equals(version) || version.endsWith("-SNAPSHOT"))) { + return; + } + + // create the scanner, and poll at 60 second intervals + try { + scannerStarted = true; + + // start this in a separate thread -- it can block for a long time + new Thread("Scanner Starter " + getName()) { + @Override + public void run() { + scanner = kieServices.newKieScanner(kieContainer); + scanner.start(60000L); + } + }.start(); + } catch (Exception e) { + // sometimes the scanner initialization fails for some reason + logger.error("startScanner error", e); } } diff --git a/policy-management/pom.xml b/policy-management/pom.xml index 857e62cf..60175f6c 100644 --- a/policy-management/pom.xml +++ b/policy-management/pom.xml @@ -330,5 +330,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java index 733a492d..a4c546f8 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/IndexedDroolsControllerFactory.java @@ -171,8 +171,6 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory { protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties, List<? extends Topic> topicEntities) { - String propertyTopicEntityPrefix; - List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>(); if (topicEntities == null || topicEntities.isEmpty()) { @@ -181,87 +179,102 @@ class IndexedDroolsControllerFactory implements DroolsControllerFactory { for (Topic topic : topicEntities) { - /* source or sink ? ueb or dmaap? */ - boolean isSource = topic instanceof TopicSource; - CommInfrastructure commInfra = topic.getTopicCommInfrastructure(); - if (commInfra == CommInfrastructure.UEB) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."; - } - } else if (commInfra == CommInfrastructure.DMAAP) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."; - } - } else if (commInfra == CommInfrastructure.NOOP) { - if (isSource) { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + "."; - } else { - propertyTopicEntityPrefix = PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; - } - } else { - throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); - } - // 1. first the topic String firstTopic = topic.getTopic(); + String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic; + // 2. check if there is a custom decoder for this topic that the user prefers to use // instead of the ones provided in the platform - String customGson = properties.getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX); - - CustomGsonCoder customGsonCoder = null; - if (customGson != null && !customGson.isEmpty()) { - try { - customGsonCoder = new CustomGsonCoder(customGson); - } catch (IllegalArgumentException e) { - logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson, - e.getMessage(), e); - } - } + CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix); // 3. second the list of classes associated with each topic String eventClasses = properties - .getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX); + .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX); if (eventClasses == null || eventClasses.isEmpty()) { logger.warn("There are no event classes for topic {}", firstTopic); continue; } - List<PotentialCoderFilter> classes2Filters = new ArrayList<>(); + List<PotentialCoderFilter> classes2Filters = + getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses); - List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*"))); + TopicCoderFilterConfiguration topic2Classes2Filters = + new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder); + topics2DecodedClasses2Filters.add(topic2Classes2Filters); + } - for (String theClass : topicClasses) { + return topics2DecodedClasses2Filters; + } + private String getPropertyTopicPrefix(Topic topic) { + boolean isSource = topic instanceof TopicSource; + CommInfrastructure commInfra = topic.getTopicCommInfrastructure(); + if (commInfra == CommInfrastructure.UEB) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.DMAAP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."; + } + } else if (commInfra == CommInfrastructure.NOOP) { + if (isSource) { + return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + "."; + } else { + return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + "."; + } + } else { + throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra); + } + } + + private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) { + String customGson = properties.getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX); + + CustomGsonCoder customGsonCoder = null; + if (customGson != null && !customGson.isEmpty()) { + try { + customGsonCoder = new CustomGsonCoder(customGson); + } catch (IllegalArgumentException e) { + logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson, + e.getMessage(), e); + } + } + return customGsonCoder; + } - // 4. third, for each coder class, get the filter expression + private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix, + String eventClasses) { - String filter = properties - .getProperty(propertyTopicEntityPrefix + firstTopic - + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX - + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX); + List<PotentialCoderFilter> classes2Filters = new ArrayList<>(); - JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter); - PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter); - classes2Filters.add(class2Filters); - } + List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*"))); - TopicCoderFilterConfiguration topic2Classes2Filters = - new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder); - topics2DecodedClasses2Filters.add(topic2Classes2Filters); + for (String theClass : topicClasses) { + + // 4. for each coder class, get the filter expression + + String filter = properties + .getProperty(propertyPrefix + + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX + + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX); + + JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter); + PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter); + classes2Filters.add(class2Filters); } - return topics2DecodedClasses2Filters; + return classes2Filters; } @Override diff --git a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java index ca1f2283..77bfcf9f 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/controller/internal/MavenDroolsController.java @@ -41,6 +41,7 @@ import org.kie.api.runtime.rule.QueryResultsRow; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.gson.annotation.GsonJsonProperty; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.common.utils.services.OrderedServiceImpl; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; @@ -186,25 +187,11 @@ public class MavenDroolsController implements DroolsController { logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion); - if (newGroupId == null || newGroupId.isEmpty()) { - throw new IllegalArgumentException("Missing maven group-id coordinate"); - } + validateText(newGroupId, "Missing maven group-id coordinate"); + validateText(newArtifactId, "Missing maven artifact-id coordinate"); + validateText(newVersion, "Missing maven version coordinate"); - if (newArtifactId == null || newArtifactId.isEmpty()) { - throw new IllegalArgumentException("Missing maven artifact-id coordinate"); - } - - if (newVersion == null || newVersion.isEmpty()) { - throw new IllegalArgumentException("Missing maven version coordinate"); - } - - if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID) - || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID) - || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) { - throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " - + newGroupId + ":" + newArtifactId + ":" - + newVersion); - } + validateHasBrain(newGroupId, newArtifactId, newVersion); if (newGroupId.equalsIgnoreCase(this.getGroupId()) && newArtifactId.equalsIgnoreCase(this.getArtifactId()) @@ -214,13 +201,7 @@ public class MavenDroolsController implements DroolsController { return; } - if (!newGroupId.equalsIgnoreCase(this.getGroupId()) - || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { - throw new IllegalArgumentException( - "Group ID and Artifact ID maven coordinates must be identical for the upgrade: " - + newGroupId + ":" + newArtifactId + ":" - + newVersion + " vs. " + this); - } + validateNewVersion(newGroupId, newArtifactId, newVersion); /* upgrade */ String messages = this.policyContainer.updateToVersion(newVersion); @@ -239,6 +220,32 @@ public class MavenDroolsController implements DroolsController { logger.info("UPDATE-TO-VERSION: completed {}", this); } + private void validateText(String text, String errorMessage) { + if (text == null || text.isEmpty()) { + throw new IllegalArgumentException(errorMessage); + } + } + + private void validateHasBrain(String newGroupId, String newArtifactId, String newVersion) { + if (newGroupId.equalsIgnoreCase(DroolsControllerConstants.NO_GROUP_ID) + || newArtifactId.equalsIgnoreCase(DroolsControllerConstants.NO_ARTIFACT_ID) + || newVersion.equalsIgnoreCase(DroolsControllerConstants.NO_VERSION)) { + throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion); + } + } + + private void validateNewVersion(String newGroupId, String newArtifactId, String newVersion) { + if (!newGroupId.equalsIgnoreCase(this.getGroupId()) + || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) { + throw new IllegalArgumentException( + "Group ID and Artifact ID maven coordinates must be identical for the upgrade: " + + newGroupId + ":" + newArtifactId + ":" + + newVersion + " vs. " + this); + } + } + /** * initialize decoders for all the topics supported by this controller * Note this is critical to be done after the Policy Container is @@ -259,18 +266,7 @@ public class MavenDroolsController implements DroolsController { for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) { String topic = coderConfig.getTopic(); - CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); - if (customGsonCoder != null - && customGsonCoder.getClassContainer() != null - && !customGsonCoder.getClassContainer().isEmpty()) { - - String customGsonCoderClass = customGsonCoder.getClassContainer(); - if (!isClass(customGsonCoderClass)) { - throw makeRetrieveEx(customGsonCoderClass); - } else { - logClassFetched(customGsonCoderClass); - } - } + CustomGsonCoder customGsonCoder = getCustomCoder(coderConfig); List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters(); if (coderFilters == null || coderFilters.isEmpty()) { @@ -308,6 +304,22 @@ public class MavenDroolsController implements DroolsController { } } + private CustomGsonCoder getCustomCoder(TopicCoderFilterConfiguration coderConfig) { + CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder(); + if (customGsonCoder != null + && customGsonCoder.getClassContainer() != null + && !customGsonCoder.getClassContainer().isEmpty()) { + + String customGsonCoderClass = customGsonCoder.getClassContainer(); + if (!isClass(customGsonCoderClass)) { + throw makeRetrieveEx(customGsonCoderClass); + } else { + logClassFetched(customGsonCoderClass); + } + } + return customGsonCoder; + } + /** * Logs an error and makes an exception for an item that cannot be retrieved. * @param itemName the item to retrieve @@ -520,15 +532,11 @@ public class MavenDroolsController implements DroolsController { // Broadcast - for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { - try { - if (feature.beforeInsert(this, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getDroolsProviders().getList(), + feature -> feature.beforeInsert(this, event), + (feature, ex) -> logger.error("{}: feature {} before-insert failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } boolean successInject = this.policyContainer.insertAll(event); @@ -536,16 +544,10 @@ public class MavenDroolsController implements DroolsController { logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames()); } - for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) { - try { - if (feature.afterInsert(this, event, successInject)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-insert failure because of {}", - this, feature.getClass().getName(), e.getMessage(), e); - } - } + FeatureApiUtils.apply(getDroolsProviders().getList(), + feature -> feature.afterInsert(this, event, successInject), + (feature, ex) -> logger.error("{}: feature {} after-insert failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return true; @@ -840,18 +842,7 @@ public class MavenDroolsController implements DroolsController { PolicySession session = getSession(sessionName); KieSession kieSession = session.getKieSession(); - boolean found = false; - for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { - for (Query q : kiePackage.getQueries()) { - if (q.getName() != null && q.getName().equals(queryName)) { - found = true; - break; - } - } - } - if (!found) { - throw new IllegalArgumentException("Invalid Query Name: " + queryName); - } + validateQueryName(kieSession, queryName); List<Object> factObjects = new ArrayList<>(); @@ -870,6 +861,18 @@ public class MavenDroolsController implements DroolsController { return factObjects; } + private void validateQueryName(KieSession kieSession, String queryName) { + for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) { + for (Query q : kiePackage.getQueries()) { + if (q.getName() != null && q.getName().equals(queryName)) { + return; + } + } + } + + throw new IllegalArgumentException("Invalid Query Name: " + queryName); + } + @Override public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) { String factClassName = fact.getClass().getName(); diff --git a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java index 89a7a420..cb4ce07e 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java +++ b/policy-management/src/main/java/org/onap/policy/drools/protocol/coders/GenericEventProtocolCoder.java @@ -124,40 +124,44 @@ abstract class GenericEventProtocolCoder { coders.put(key, coderTools); - if (reverseCoders.containsKey(reverseKey)) { - // There is another controller (different group id/artifact id/topic) - // that shares the class and the topic. - - List<ProtocolCoderToolset> toolsets = - reverseCoders.get(reverseKey); - boolean present = false; - for (ProtocolCoderToolset parserSet : toolsets) { - // just doublecheck - present = parserSet.getControllerId().equals(key); - if (present) { - /* anomaly */ - logger.error( - "{}: unexpected toolset reverse mapping found for {}:{}: {}", - this, - reverseKey, - key, - parserSet); - } - } + addReverseCoder(coderTools, key, reverseKey); + } + } + private void addReverseCoder(GsonProtocolCoderToolset coderTools, String key, String reverseKey) { + if (reverseCoders.containsKey(reverseKey)) { + // There is another controller (different group id/artifact id/topic) + // that shares the class and the topic. + + List<ProtocolCoderToolset> toolsets = + reverseCoders.get(reverseKey); + boolean present = false; + for (ProtocolCoderToolset parserSet : toolsets) { + // just doublecheck + present = parserSet.getControllerId().equals(key); if (present) { - return; - } else { - logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools); - toolsets.add(coderTools); + /* anomaly */ + logger.error( + "{}: unexpected toolset reverse mapping found for {}:{}: {}", + this, + reverseKey, + key, + parserSet); } + } + + if (present) { + return; } else { - List<ProtocolCoderToolset> toolsets = new ArrayList<>(); + logger.info("{}: adding coder set for {}: {} ", this, reverseKey, coderTools); toolsets.add(coderTools); - - logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets); - reverseCoders.put(reverseKey, toolsets); } + } else { + List<ProtocolCoderToolset> toolsets = new ArrayList<>(); + toolsets.add(coderTools); + + logger.info("{}: adding toolset for reverse key {}: {}", this, reverseKey, toolsets); + reverseCoders.put(reverseKey, toolsets); } } @@ -217,30 +221,36 @@ abstract class GenericEventProtocolCoder { for (CoderFilters codeFilter : coderToolset.getCoders()) { String className = codeFilter.getCodedClass(); String reverseKey = this.reverseCodersKey(topic, className); - if (this.reverseCoders.containsKey(reverseKey)) { - List<ProtocolCoderToolset> toolsets = - this.reverseCoders.get(reverseKey); - Iterator<ProtocolCoderToolset> toolsetsIter = - toolsets.iterator(); - while (toolsetsIter.hasNext()) { - ProtocolCoderToolset toolset = toolsetsIter.next(); - if (toolset.getControllerId().equals(key)) { - logger.info( - "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey); - toolsetsIter.remove(); - } - } - - if (this.reverseCoders.get(reverseKey).isEmpty()) { - logger.info("{}: removing reverse mapping for {}: ", this, reverseKey); - this.reverseCoders.remove(reverseKey); - } - } + removeReverseCoder(key, reverseKey); } } } } + private void removeReverseCoder(String key, String reverseKey) { + if (!this.reverseCoders.containsKey(reverseKey)) { + return; + } + + List<ProtocolCoderToolset> toolsets = + this.reverseCoders.get(reverseKey); + Iterator<ProtocolCoderToolset> toolsetsIter = + toolsets.iterator(); + while (toolsetsIter.hasNext()) { + ProtocolCoderToolset toolset = toolsetsIter.next(); + if (toolset.getControllerId().equals(key)) { + logger.info( + "{}: removed coder from toolset for {} from reverse mapping", this, reverseKey); + toolsetsIter.remove(); + } + } + + if (this.reverseCoders.get(reverseKey).isEmpty()) { + logger.info("{}: removing reverse mapping for {}: ", this, reverseKey); + this.reverseCoders.remove(reverseKey); + } + } + /** * does it support coding. * @@ -446,20 +456,7 @@ abstract class GenericEventProtocolCoder { } for (ProtocolCoderToolset encoderSet : toolsets) { - // figure out the right toolset - String groupId = encoderSet.getGroupId(); - String artifactId = encoderSet.getArtifactId(); - List<CoderFilters> coderFilters = encoderSet.getCoders(); - for (CoderFilters coder : coderFilters) { - if (coder.getCodedClass().equals(encodedClass.getClass().getName())) { - DroolsController droolsController = - DroolsControllerConstants.getFactory().get(groupId, artifactId, ""); - if (droolsController.ownsCoder( - encodedClass.getClass(), coder.getModelClassLoaderHash())) { - droolsControllers.add(droolsController); - } - } - } + addToolsetControllers(droolsControllers, encodedClass, encoderSet); } if (droolsControllers.isEmpty()) { @@ -473,6 +470,24 @@ abstract class GenericEventProtocolCoder { return droolsControllers; } + private void addToolsetControllers(List<DroolsController> droolsControllers, Object encodedClass, + ProtocolCoderToolset encoderSet) { + // figure out the right toolset + String groupId = encoderSet.getGroupId(); + String artifactId = encoderSet.getArtifactId(); + List<CoderFilters> coderFilters = encoderSet.getCoders(); + for (CoderFilters coder : coderFilters) { + if (coder.getCodedClass().equals(encodedClass.getClass().getName())) { + DroolsController droolsController = + DroolsControllerConstants.getFactory().get(groupId, artifactId, ""); + if (droolsController.ownsCoder( + encodedClass.getClass(), coder.getModelClassLoaderHash())) { + droolsControllers.add(droolsController); + } + } + } + } + /** * get all filters by maven coordinates and topic. * diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java index 17247f41..82cd015e 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyController.java @@ -48,12 +48,12 @@ public interface PolicyController extends Startable, Lockable { /** * Get the topic readers of interest for this controller. */ - List<? extends TopicSource> getTopicSources(); + List<TopicSource> getTopicSources(); /** * Get the topic readers of interest for this controller. */ - List<? extends TopicSink> getTopicSinks(); + List<TopicSink> getTopicSinks(); /** * Get the Drools Controller. diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java index 36d8ca59..32e3f674 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngineManager.java @@ -24,6 +24,7 @@ import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERV import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME; import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT; +import com.att.aft.dme2.internal.apache.commons.lang3.StringUtils; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.gson.Gson; @@ -31,6 +32,11 @@ import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Stream; +import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; @@ -43,6 +49,7 @@ import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInst import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.gson.annotation.GsonJsonProperty; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.core.PolicyContainer; @@ -89,11 +96,13 @@ class PolicyEngineManager implements PolicyEngine { /** * Is the Policy Engine running. */ + @Getter private volatile boolean alive = false; /** * Is the engine locked. */ + @Getter private volatile boolean locked = false; /** @@ -109,16 +118,19 @@ class PolicyEngineManager implements PolicyEngine { /** * Policy Engine Sources. */ - private List<? extends TopicSource> sources = new ArrayList<>(); + @Getter + private List<TopicSource> sources = new ArrayList<>(); /** * Policy Engine Sinks. */ - private List<? extends TopicSink> sinks = new ArrayList<>(); + @Getter + private List<TopicSink> sinks = new ArrayList<>(); /** * Policy Engine HTTP Servers. */ + @Getter private List<HttpServletServer> httpServers = new ArrayList<>(); /** @@ -130,15 +142,11 @@ class PolicyEngineManager implements PolicyEngine { @Override public synchronized void boot(String[] cliArgs) { - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeBoot(this, cliArgs)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeBoot(this, cliArgs), + (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } try { @@ -147,16 +155,10 @@ class PolicyEngineManager implements PolicyEngine { logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e); } - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterBoot(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-boot failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterBoot(this), + (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -220,15 +222,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch pre configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeConfigure(this, properties)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-configure failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeConfigure(this, properties), + (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.properties = properties; @@ -260,16 +258,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post configure hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterConfigure(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-configure failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterConfigure(this), + (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -337,16 +329,11 @@ class PolicyEngineManager implements PolicyEngine { } // feature hook - for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) { - try { - if (controllerFeature.afterCreate(controller)) { - return controller; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-controller-create failure because of {}", this, - controllerFeature.getClass().getName(), e.getMessage(), e); - } - } + PolicyController controller2 = controller; + FeatureApiUtils.apply(getControllerProviders(), + feature -> feature.afterCreate(controller2), + (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}", + this, feature.getClass().getName(), ex.getMessage(), ex)); return controller; } @@ -393,7 +380,6 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("No controller configuration has been provided"); } - PolicyController policyController = null; try { final String operation = configController.getOperation(); if (operation == null || operation.isEmpty()) { @@ -401,75 +387,14 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException("operation must be provided"); } - try { - policyController = getControllerFactory().get(controllerName); - } catch (final IllegalArgumentException e) { - // not found - logger.warn("Policy Controller " + controllerName + " not found", e); - } - + PolicyController policyController = getController(controllerName); if (policyController == null) { - - if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) - || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { - throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); - } - - /* Recovery case */ - - logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); - - final Properties controllerProperties = - getPersistenceManager().getControllerProperties(controllerName); - - /* - * returned properties cannot be null (per implementation) assert (properties != - * null) - */ - - if (controllerProperties == null) { - throw new IllegalArgumentException(controllerName + " is invalid"); - } - - logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", - controllerName); - - /* - * try to bring up bad controller in brainless mode, after having it - * working, apply the new create/update operation. - */ - controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, - DroolsControllerConstants.NO_GROUP_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, - DroolsControllerConstants.NO_ARTIFACT_ID); - controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, - DroolsControllerConstants.NO_VERSION); - - policyController = getPolicyEngine().createPolicyController(controllerName, controllerProperties); + policyController = findController(controllerName, operation); /* fall through to do brain update operation */ } - switch (operation) { - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: - policyController.unlock(); - getControllerFactory().patch(policyController, configController.getDrools()); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: - policyController.lock(); - break; - case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: - policyController.unlock(); - break; - default: - final String msg = "Controller Operation Configuration is not supported: " + operation + " for " - + controllerName; - logger.warn(msg); - throw new IllegalArgumentException(msg); - } + updateController(controllerName, policyController, operation, configController); return policyController; } catch (final Exception e) { @@ -481,84 +406,135 @@ class PolicyEngineManager implements PolicyEngine { } } + private PolicyController getController(final String controllerName) { + PolicyController policyController = null; + try { + policyController = getControllerFactory().get(controllerName); + } catch (final IllegalArgumentException e) { + // not found + logger.warn("Policy Controller " + controllerName + " not found", e); + } + return policyController; + } + + private PolicyController findController(final String controllerName, final String operation) { + if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) + || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) { + throw new IllegalArgumentException(controllerName + " is not available for operation " + operation); + } + + /* Recovery case */ + + logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName); + + final Properties controllerProperties = + getPersistenceManager().getControllerProperties(controllerName); + + /* + * returned properties cannot be null (per implementation) assert (properties != + * null) + */ + + if (controllerProperties == null) { + throw new IllegalArgumentException(controllerName + " is invalid"); + } + + logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless", + controllerName); + + /* + * try to bring up bad controller in brainless mode, after having it + * working, apply the new create/update operation. + */ + controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID, + DroolsControllerConstants.NO_GROUP_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID, + DroolsControllerConstants.NO_ARTIFACT_ID); + controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION, + DroolsControllerConstants.NO_VERSION); + + return getPolicyEngine().createPolicyController(controllerName, controllerProperties); + } + + private void updateController(final String controllerName, PolicyController policyController, + final String operation, ControllerConfiguration configController) { + switch (operation) { + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE: + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE: + policyController.unlock(); + getControllerFactory().patch(policyController, configController.getDrools()); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK: + policyController.lock(); + break; + case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK: + policyController.unlock(); + break; + default: + final String msg = "Controller Operation Configuration is not supported: " + operation + " for " + + controllerName; + logger.warn(msg); + throw new IllegalArgumentException(msg); + } + } + @Override public synchronized boolean start() { /* policy-engine dispatch pre start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStart(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStart(this), + (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } - boolean success = true; if (this.locked) { throw new IllegalStateException(ENGINE_LOCKED_MSG); } this.alive = true; + AtomicReference<Boolean> success = new AtomicReference<>(true); + /* Start Policy Engine exclusively-owned (unmanaged) http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.waitedStart(10 * 1000L)) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + httpServer -> httpServer.waitedStart(10 * 1000L), + (item, ex) -> logger.error("{}: cannot start http-server {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine exclusively-owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::start, + (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::start, + (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}", + this, item, ex.getMessage(), ex)); /* Start Policy Controllers */ - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.start()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start policy-controller {} because of {}", this, controller, e.getMessage(), - e); - success = false; - } - } + attempt(success, getControllerFactory().inventory(), + PolicyController::start, + (item, ex) -> { + logger.error("{}: cannot start policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Start managed Topic Endpoints */ try { if (!getTopicEndpointManager().start()) { - success = false; + success.set(false); } } catch (final IllegalStateException e) { logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e); @@ -570,109 +546,80 @@ class PolicyEngineManager implements PolicyEngine { startPdpJmxListener(); /* policy-engine dispatch after start hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStart(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStart(this), + (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); + } + + @FunctionalInterface + private static interface PredicateWithEx<T> { + public boolean test(T value) throws InterruptedException; } @Override public synchronized boolean stop() { /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeStop(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeStop(this), + (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } /* stop regardless of the lock state */ - boolean success = true; if (!this.alive) { return true; } this.alive = false; - final List<PolicyController> controllers = getControllerFactory().inventory(); - for (final PolicyController controller : controllers) { - try { - if (!controller.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot stop policy-controller {} because of {}", this, controller, e.getMessage(), e); - success = false; - } - } + AtomicReference<Boolean> success = new AtomicReference<>(true); + + attempt(success, getControllerFactory().inventory(), + PolicyController::stop, + (item, ex) -> { + logger.error("{}: cannot stop policy-controller {} because of {}", this, item, + ex.getMessage(), ex); + success.set(false); + }); /* Stop Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - if (!source.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + attempt(success, this.sources, + TopicSource::stop, + (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Stop Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - if (!sink.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + attempt(success, this.sinks, + TopicSink::stop, + (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* stop all managed topics sources and sinks */ if (!getTopicEndpointManager().stop()) { - success = false; + success.set(false); } /* stop all unmanaged http servers */ - for (final HttpServletServer httpServer : this.httpServers) { - try { - if (!httpServer.stop()) { - success = false; - } - } catch (final Exception e) { - logger.error("{}: cannot start http-server {} because of {}", this, httpServer, e.getMessage(), e); - } - } + attempt(success, this.httpServers, + HttpServletServer::stop, + (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item, + ex.getMessage(), ex)); // stop JMX? /* policy-engine dispatch pre stop hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterStop(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterStop(this), + (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); - return success; + return success.get(); } @Override @@ -687,36 +634,26 @@ class PolicyEngineManager implements PolicyEngine { exitThread.start(); /* policy-engine dispatch pre shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeShutdown(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeShutdown(this), + (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.alive = false; /* Shutdown Policy Engine owned (unmanaged) sources */ - for (final TopicSource source : this.sources) { - try { - source.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-source {} because of {}", this, source, e.getMessage(), e); - } - } + applyAll(this.sources, + TopicSource::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown Policy Engine owned (unmanaged) sinks */ - for (final TopicSink sink : this.sinks) { - try { - sink.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown topic-sink {} because of {}", this, sink, e.getMessage(), e); - } - } + applyAll(this.sinks, + TopicSink::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item, + ex.getMessage(), ex)); /* Shutdown managed resources */ getControllerFactory().shutdown(); @@ -728,19 +665,45 @@ class PolicyEngineManager implements PolicyEngine { stopPdpJmxListener(); /* policy-engine dispatch post shutdown hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterShutdown(this), + (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); + + exitThread.interrupt(); + logger.info("{}: normal termination", this); + } + + private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { try { - if (feature.afterShutdown(this)) { - return; + if (!pred.test(item)) { + success.set(false); } - } catch (final Exception e) { - logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); + + } catch (InterruptedException ex) { + handleEx.accept(item, ex); + Thread.currentThread().interrupt(); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); } } + } - exitThread.interrupt(); - logger.info("{}: normal termination", this); + private <T> void applyAll(List<T> items, Consumer<T> function, + BiConsumer<T,Exception> handleEx) { + + for (T item : items) { + try { + function.accept(item); + + } catch (RuntimeException ex) { + handleEx.accept(item, ex); + } + } } /** @@ -764,14 +727,10 @@ class PolicyEngineManager implements PolicyEngine { /* * shut down the Policy Engine owned http servers as the very last thing */ - for (final HttpServletServer httpServer : PolicyEngineManager.this.getHttpServers()) { - try { - httpServer.shutdown(); - } catch (final Exception e) { - logger.error("{}: cannot shutdown http-server {} because of {}", PolicyEngineManager.this, - httpServer, e.getMessage(), e); - } - } + applyAll(PolicyEngineManager.this.getHttpServers(), + HttpServletServer::shutdown, + (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item, + ex.getMessage(), ex)); logger.info("{}: exit", PolicyEngineManager.this); doExit(0); @@ -790,23 +749,14 @@ class PolicyEngineManager implements PolicyEngine { } @Override - public boolean isAlive() { - return this.alive; - } - - @Override public synchronized boolean lock() { /* policy-engine dispatch pre lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeLock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeLock(this), + (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (this.locked) { @@ -829,16 +779,10 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().lock() && success; /* policy-engine dispatch post lock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterLock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterLock(this), + (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -847,15 +791,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized boolean unlock() { /* policy-engine dispatch pre unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeUnlock(this)) { - return true; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeUnlock(this), + (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (!this.locked) { @@ -879,26 +819,15 @@ class PolicyEngineManager implements PolicyEngine { success = getTopicEndpointManager().unlock() && success; /* policy-engine dispatch after unlock hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterUnlock(this)) { - return success; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterUnlock(this), + (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @Override - public boolean isLocked() { - return this.locked; - } - - @Override public void removePolicyController(String name) { getControllerFactory().destroy(name); } @@ -933,24 +862,6 @@ class PolicyEngineManager implements PolicyEngine { return this.properties; } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSource> getSources() { - return (List<TopicSource>) this.sources; - } - - @SuppressWarnings("unchecked") - @Override - public List<TopicSink> getSinks() { - return (List<TopicSink>) this.sinks; - } - - @Override - public List<HttpServletServer> getHttpServers() { - return this.httpServers; - } - @Override public List<String> getFeatures() { final List<String> features = new ArrayList<>(); @@ -985,15 +896,12 @@ class PolicyEngineManager implements PolicyEngine { @Override public void onTopicEvent(CommInfrastructure commType, String topic, String event) { /* policy-engine pre topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.beforeOnTopicEvent(this, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } + if (FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.beforeOnTopicEvent(this, commType, topic, event), + (feature, ex) -> logger.error( + "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex))) { + return; } /* configuration request */ @@ -1006,16 +914,11 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine after topic event hook */ - for (final PolicyEngineFeatureApi feature : getFeatureProviders()) { - try { - if (feature.afterOnTopicEvent(this, configuration, commType, topic, event)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, - feature.getClass().getName(), event, e.getMessage(), e); - } - } + PdpdConfiguration configuration2 = configuration; + FeatureApiUtils.apply(getFeatureProviders(), + feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event), + (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this, + feature.getClass().getName(), event, ex.getMessage(), ex)); } @Override @@ -1041,7 +944,7 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalStateException(ENGINE_LOCKED_MSG); } - final List<? extends TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); + final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic); if (topicSinks == null || topicSinks.size() != 1) { throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks); } @@ -1056,11 +959,11 @@ class PolicyEngineManager implements PolicyEngine { * Note this entry point is usually from the DRL (one of the reasons busType is String. */ - if (busType == null || busType.isEmpty()) { + if (StringUtils.isBlank(busType)) { throw new IllegalArgumentException("Invalid Communication Infrastructure"); } - if (topic == null || topic.isEmpty()) { + if (StringUtils.isBlank(topic)) { throw new IllegalArgumentException(INVALID_TOPIC_MSG); } @@ -1068,12 +971,8 @@ class PolicyEngineManager implements PolicyEngine { throw new IllegalArgumentException(INVALID_EVENT_MSG); } - boolean valid = false; - for (final Topic.CommInfrastructure comm : Topic.CommInfrastructure.values()) { - if (comm.name().equals(busType)) { - valid = true; - } - } + boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name) + .anyMatch(name -> name.equals(busType)); if (!valid) { throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType); @@ -1181,15 +1080,11 @@ class PolicyEngineManager implements PolicyEngine { public synchronized void activate() { /* policy-engine dispatch pre activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeActivate(this), + (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } // activate 'policy-management' @@ -1209,31 +1104,21 @@ class PolicyEngineManager implements PolicyEngine { this.unlock(); /* policy-engine dispatch post activate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterActivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-activate failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterActivate(this), + (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override public synchronized void deactivate() { /* policy-engine dispatch pre deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.beforeDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} before-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } + if (FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.beforeDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.lock(); @@ -1248,16 +1133,10 @@ class PolicyEngineManager implements PolicyEngine { } /* policy-engine dispatch post deactivate hook */ - for (final PolicyEngineFeatureApi feature : getEngineProviders()) { - try { - if (feature.afterDeactivate(this)) { - return; - } - } catch (final Exception e) { - logger.error("{}: feature {} after-deactivate failure because of {}", this, - feature.getClass().getName(), e.getMessage(), e); - } - } + FeatureApiUtils.apply(getEngineProviders(), + feature -> feature.afterDeactivate(this), + (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } private boolean controllerConfig(PdpdConfiguration config) { @@ -1269,12 +1148,8 @@ class PolicyEngineManager implements PolicyEngine { } final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers()); - boolean success = false; - if (!(policyControllers == null || policyControllers.isEmpty()) - && (policyControllers.size() == configControllers.size())) { - success = true; - } - return success; + return (policyControllers != null && !policyControllers.isEmpty() + && policyControllers.size() == configControllers.size()); } @Override diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java index 5d915104..aa57abaf 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java @@ -34,6 +34,7 @@ import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; +import org.onap.policy.common.utils.services.FeatureApiUtils; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.controller.DroolsControllerConstants; import org.onap.policy.drools.controller.DroolsControllerFactory; @@ -54,6 +55,9 @@ import org.slf4j.LoggerFactory; */ public class AggregatedPolicyController implements PolicyController, TopicListener { + private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}"; + private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}"; + /** * Logger. */ @@ -67,12 +71,12 @@ public class AggregatedPolicyController implements PolicyController, TopicListen /** * Abstracted Event Sources List regardless communication technology. */ - private final List<? extends TopicSource> sources; + private final List<TopicSource> sources; /** * Abstracted Event Sinks List regardless communication technology. */ - private final List<? extends TopicSink> sinks; + private final List<TopicSink> sinks; /** * Mapping topics to sinks. @@ -273,15 +277,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean start() { logger.info("{}: start", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeStart(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeStart(this), + (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (this.isLocked()) { @@ -312,16 +312,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen } } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterStart(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterStart(this), + (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -333,15 +327,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean stop() { logger.info("{}: stop", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeStop(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeStop(this), + (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } /* stop regardless locked state */ @@ -362,16 +352,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.stop(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterStop(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterStop(this), + (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -383,31 +367,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public void shutdown() { logger.info("{}: shutdown", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeShutdown(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeShutdown(this), + (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.stop(); getDroolsFactory().shutdown(this.droolsController); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterShutdown(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterShutdown(this), + (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } /** @@ -417,31 +391,21 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public void halt() { logger.info("{}: halt", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeHalt(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeHalt(this), + (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } this.stop(); getDroolsFactory().destroy(this.droolsController); getPersistenceManager().deleteController(this.name); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterHalt(this)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterHalt(this), + (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); } /** @@ -455,29 +419,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen return; } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeOffer(this, commType, topic, event)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeOffer(this, commType, topic, event), + (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return; } boolean success = this.droolsController.offer(topic, event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterOffer(this, commType, topic, event, success)) { - return; - } - } catch (Exception e) { - logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterOffer(this, commType, topic, event, success), + (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex)); } @Override @@ -488,29 +442,19 @@ public class AggregatedPolicyController implements PolicyController, TopicListen return true; } - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeOffer(this, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeOffer(this, event), + (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } boolean success = this.droolsController.offer(event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterOffer(this, event, success)) { - return success; - } - } catch (Exception e) { - logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterOffer(this, event, success), + (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -527,15 +471,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeDeliver(this, commType, topic, event)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeDeliver(this, commType, topic, event), + (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } if (topic == null || topic.isEmpty()) { @@ -562,16 +502,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterDeliver(this, commType, topic, event, success)) { - return success; - } - } catch (Exception e) { - logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterDeliver(this, commType, topic, event, success), + (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -591,15 +525,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen public boolean lock() { logger.info("{}: lock", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeLock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeLock(this), + (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } synchronized (this) { @@ -615,16 +545,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.lock(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterLock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterLock(this), + (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -637,15 +561,11 @@ public class AggregatedPolicyController implements PolicyController, TopicListen logger.info("{}: unlock", this); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.beforeUnlock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } + if (FeatureApiUtils.apply(getProviders(), + feature -> feature.beforeUnlock(this), + (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex))) { + return true; } synchronized (this) { @@ -658,16 +578,10 @@ public class AggregatedPolicyController implements PolicyController, TopicListen boolean success = this.droolsController.unlock(); - for (PolicyControllerFeatureApi feature : getProviders()) { - try { - if (feature.afterUnlock(this)) { - return true; - } - } catch (Exception e) { - logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(), - e.getMessage(), e); - } - } + FeatureApiUtils.apply(getProviders(), + feature -> feature.afterUnlock(this), + (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this, + feature.getClass().getName(), ex.getMessage(), ex)); return success; } @@ -684,7 +598,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen * {@inheritDoc}. */ @Override - public List<? extends TopicSource> getTopicSources() { + public List<TopicSource> getTopicSources() { return this.sources; } @@ -692,7 +606,7 @@ public class AggregatedPolicyController implements PolicyController, TopicListen * {@inheritDoc}. */ @Override - public List<? extends TopicSink> getTopicSinks() { + public List<TopicSink> getTopicSinks() { return this.sinks; } diff --git a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java index 4c262775..0d8bdfab 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/controller/internal/MavenDroolsControllerTest.java @@ -22,7 +22,10 @@ package org.onap.policy.drools.controller.internal; import java.io.IOException; import java.nio.file.Paths; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.kie.api.builder.ReleaseId; @@ -42,13 +45,15 @@ public class MavenDroolsControllerTest { private static volatile ReleaseId releaseId; + private static volatile CountDownLatch running; + /** * Set up. * * @throws IOException throws an IO exception */ @BeforeClass - public static void setUp() throws IOException { + public static void setUpBeforeClass() throws IOException { releaseId = KieUtils.installArtifact(Paths.get(JUNIT_ECHO_KMODULE_PATH).toFile(), Paths.get(JUNIT_ECHO_KMODULE_POM_PATH).toFile(), @@ -56,6 +61,15 @@ public class MavenDroolsControllerTest { Paths.get(JUNIT_ECHO_KMODULE_DRL_PATH).toFile()); } + @Before + public void setUp() { + running = new CountDownLatch(1); + } + + public static void setRunning() { + running.countDown(); + } + @Test public void stop() throws InterruptedException { createDroolsController(10000L).stop(); @@ -106,8 +120,8 @@ public class MavenDroolsControllerTest { Assert.assertEquals(releaseId.getArtifactId(), controller.getContainer().getArtifactId()); Assert.assertEquals(releaseId.getVersion(), controller.getContainer().getVersion()); - /* courtesy timer to allow full initialization from local maven repository */ - Thread.sleep(courtesyStartTimeMs); + /* allow full initialization from local maven repository */ + Assert.assertTrue(running.await(courtesyStartTimeMs, TimeUnit.MILLISECONDS)); Assert.assertEquals(1, controller.getSessionNames().size()); Assert.assertEquals(JUNIT_ECHO_KSESSION, controller.getSessionNames().get(0)); diff --git a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java index bd595725..7787a7b6 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java @@ -231,7 +231,7 @@ public class ProtocolCoderToolsetTest { Properties sinkConfig = new Properties(); sinkConfig.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC); - final List<? extends TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig); + final List<TopicSink> noopTopics = TopicEndpointManager.getManager().addTopicSinks(sinkConfig); Properties droolsControllerConfig = new Properties(); droolsControllerConfig.put(DroolsPropertyConstants.RULES_GROUPID, releaseId.getGroupId()); diff --git a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java index 173c1738..237bd4df 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java @@ -171,13 +171,17 @@ public class RestManagerTest { * @throws InterruptedException Interrupted exception */ @AfterClass - public static void tearDown() throws IOException, InterruptedException { + public static void tearDown() throws IOException { + try { + client.close(); + } catch (IOException ex) { + logger.warn("cannot close HTTP client connection", ex); + } + /* Shutdown managed resources */ PolicyControllerConstants.getFactory().shutdown(); TopicEndpointManager.getManager().shutdown(); PolicyEngineConstants.getManager().stop(); - Thread.sleep(10000L); - client.close(); cleanUpWorkingDirs(); } diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java index df1f6cca..997fc03e 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/PolicyEngineTest.java @@ -20,6 +20,7 @@ package org.onap.policy.drools.system; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -29,6 +30,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.FixMethodOrder; @@ -302,7 +304,6 @@ public class PolicyEngineTest { TopicEndpointManager.getManager().shutdown(); PolicyEngineConstants.getManager().stop(); - Thread.sleep(10000L); - assertFalse(PolicyEngineConstants.getManager().isAlive()); + await().atMost(10, TimeUnit.SECONDS).until(() -> !PolicyEngineConstants.getManager().isAlive()); } } diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java index 6f09ab9b..695893d4 100644 --- a/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java +++ b/policy-management/src/test/java/org/onap/policy/drools/system/internal/AggregatedPolicyControllerTest.java @@ -924,7 +924,7 @@ public class AggregatedPolicyControllerTest { // remaining methods should not have been invoked assertThatThrownBy(() -> verifyBefore.accept(prov2)).isInstanceOf(AssertionError.class); - assertThatThrownBy(() -> verifyMiddle.run()).isInstanceOf(AssertionError.class); + assertThatThrownBy(verifyMiddle::run).isInstanceOf(AssertionError.class); assertThatThrownBy(() -> verifyAfter.accept(prov1)).isInstanceOf(AssertionError.class); assertThatThrownBy(() -> verifyAfter.accept(prov2)).isInstanceOf(AssertionError.class); diff --git a/policy-management/src/test/resources/echo.drl b/policy-management/src/test/resources/echo.drl index 664df639..bd26f95b 100644 --- a/policy-management/src/test/resources/echo.drl +++ b/policy-management/src/test/resources/echo.drl @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,13 @@ package org.onap.policy.drools.test; +import org.onap.policy.drools.controller.internal.MavenDroolsControllerTest; + rule "INIT" lock-on-active when then + MavenDroolsControllerTest.setRunning(); insert(new String("hello,I am up")); end diff --git a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java index f10343e2..ac90ab96 100644 --- a/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java +++ b/policy-utils/src/main/java/org/onap/policy/drools/utils/logging/MdcTransactionImpl.java @@ -55,6 +55,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Date; import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -221,111 +222,45 @@ class MdcTransactionImpl implements MdcTransaction { */ @Override public MdcTransaction flush() { - if (this.requestId != null && !this.requestId.isEmpty()) { - MDC.put(REQUEST_ID, this.requestId); - } - - if (this.invocationId != null && !this.invocationId.isEmpty()) { - MDC.put(INVOCATION_ID, this.invocationId); - } - - if (this.partner != null) { - MDC.put(PARTNER_NAME, this.partner); - } - - if (this.virtualServerName != null) { - MDC.put(VIRTUAL_SERVER_NAME, this.virtualServerName); - } - - if (this.serverName != null) { - MDC.put(SERVER, this.serverName); - } - - if (this.serverIpAddress != null) { - MDC.put(SERVER_IP_ADDRESS, this.serverIpAddress); - } - - if (this.serverFqdn != null) { - MDC.put(SERVER_FQDN, this.serverFqdn); - } - - if (this.serviceName != null) { - MDC.put(SERVICE_NAME, this.serviceName); - } - - if (this.startTime != null) { - MDC.put(BEGIN_TIMESTAMP, timestamp(this.startTime)); - } - - if (this.endTime != null) { - MDC.put(END_TIMESTAMP, timestamp(this.endTime)); - } else { - this.setEndTime(null); - MDC.put(END_TIMESTAMP, timestamp(this.endTime)); - } + setMdc(REQUEST_ID, this.requestId); + setMdc(INVOCATION_ID, this.invocationId); + setMdc(PARTNER_NAME, this.partner); + setMdc(VIRTUAL_SERVER_NAME, this.virtualServerName); + setMdc(SERVER, this.serverName); + setMdc(SERVER_IP_ADDRESS, this.serverIpAddress); + setMdc(SERVER_FQDN, this.serverFqdn); + setMdc(SERVICE_NAME, this.serviceName); + setMdc(BEGIN_TIMESTAMP, timestamp(this.startTime)); + setMdc(END_TIMESTAMP, timestamp(this.endTime)); if (this.elapsedTime != null) { MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime)); - } else { - if (endTime != null && startTime != null) { - this.elapsedTime = Duration.between(startTime, endTime).toMillis(); - MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime)); - } - } - - if (this.serviceInstanceId != null) { - MDC.put(SERVICE_INSTANCE_ID, this.serviceInstanceId); - } - - if (this.instanceUuid != null) { - MDC.put(INSTANCE_UUID, this.instanceUuid); - } - - if (this.processKey != null) { - MDC.put(PROCESS_KEY, this.processKey); - } - - if (this.statusCode != null) { - MDC.put(STATUS_CODE, this.statusCode); - } - - if (this.responseCode != null) { - MDC.put(RESPONSE_CODE, this.responseCode); - } - - if (this.responseDescription != null) { - MDC.put(RESPONSE_DESCRIPTION, this.responseDescription); - } - - if (this.theSeverity != null) { - MDC.put(SEVERITY, this.theSeverity); - } - - if (this.alertSeverity != null) { - MDC.put(ALERT_SEVERITY, this.alertSeverity); - } - - if (this.targetEntity != null) { - MDC.put(TARGET_ENTITY, this.targetEntity); - } - - if (this.targetServiceName != null) { - MDC.put(TARGET_SERVICE_NAME, this.targetServiceName); + } else if (endTime != null && startTime != null) { + this.elapsedTime = Duration.between(startTime, endTime).toMillis(); + MDC.put(ELAPSED_TIME, String.valueOf(this.elapsedTime)); } - if (this.targetVirtualEntity != null) { - MDC.put(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity); - } + setMdc(SERVICE_INSTANCE_ID, this.serviceInstanceId); + setMdc(INSTANCE_UUID, this.instanceUuid); + setMdc(PROCESS_KEY, this.processKey); + setMdc(STATUS_CODE, this.statusCode); + setMdc(RESPONSE_CODE, this.responseCode); + setMdc(RESPONSE_DESCRIPTION, this.responseDescription); + setMdc(SEVERITY, this.theSeverity); + setMdc(ALERT_SEVERITY, this.alertSeverity); + setMdc(TARGET_ENTITY, this.targetEntity); + setMdc(TARGET_SERVICE_NAME, this.targetServiceName); + setMdc(TARGET_VIRTUAL_ENTITY, this.targetVirtualEntity); + setMdc(CLIENT_IP_ADDRESS, this.clientIpAddress); + setMdc(REMOTE_HOST, this.remoteHost); - if (this.clientIpAddress != null) { - MDC.put(CLIENT_IP_ADDRESS, this.clientIpAddress); - } + return this; + } - if (this.remoteHost != null) { - MDC.put(REMOTE_HOST, this.remoteHost); + private void setMdc(String paramName, String value) { + if (!StringUtils.isBlank(value)) { + MDC.put(paramName, value); } - - return this; } @Override @@ -689,6 +624,10 @@ class MdcTransactionImpl implements MdcTransaction { @Override public String timestamp(Instant time) { + if (time == null) { + return null; + } + return new SimpleDateFormat(DATE_FORMAT).format(Date.from(time)); } |