aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PMStandbyStateChangeNotifier.java545
-rw-r--r--feature-healthcheck/src/main/java/org/onap/policy/drools/healthcheck/HealthCheck.java6
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java5
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java6
-rw-r--r--feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java22
-rw-r--r--feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java7
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java4
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java84
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java25
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java6
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/protocol/coders/EventProtocolCoderTest.java4
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java4
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java20
-rw-r--r--policy-management/src/test/java/org/onap/policy/drools/system/test/PolicyEngineTest.java10
14 files changed, 364 insertions, 384 deletions
diff --git a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PMStandbyStateChangeNotifier.java b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PMStandbyStateChangeNotifier.java
index 8519cdbb..805c4b80 100644
--- a/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PMStandbyStateChangeNotifier.java
+++ b/feature-active-standby-management/src/main/java/org/onap/policy/drools/activestandby/PMStandbyStateChangeNotifier.java
@@ -18,8 +18,8 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.policy.drools.activestandby;
-
+package org.onap.policy.drools.activestandby;
+
/*
* Per MultiSite_v1-10.ppt:
*
@@ -45,9 +45,9 @@ import java.util.TimerTask;
import org.onap.policy.common.im.StateChangeNotifier;
import org.onap.policy.common.im.StateManagement;
+import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.onap.policy.drools.system.PolicyEngine;
/*
* Some background:
@@ -66,278 +66,293 @@ import org.onap.policy.drools.system.PolicyEngine;
* the demote), but providingservice (as reset by the election handling logic) and conn.standDownPdp() would not get called!
*
* To fix this bug, we consolidated StandbyStateChangeNotifier and PMStandbyStateChangeNotifier, with the standDownPdp() always
- * being invoked prior to the ProxyTopicEndpointManager.getInstance().lock(). In this way, when the election handling logic is invoked
+ * being invoked prior to the TopicEndpoint.manager.lock(). In this way, when the election handling logic is invoked
* during the controller stoppages, the PDP is in hotstandby and the standdown occurs.
*
*/
public class PMStandbyStateChangeNotifier extends StateChangeNotifier {
- // get an instance of logger
- private static final Logger logger = LoggerFactory.getLogger(PMStandbyStateChangeNotifier.class);
- private Timer delayActivateTimer;
- private int pdpUpdateInterval;
- private boolean isWaitingForActivation;
- private long startTimeWaitingForActivationMs;
- private long waitInterval;
- private boolean isNowActivating;
- private String previousStandbyStatus;
- public static final String NONE = "none";
- public static final String UNSUPPORTED = "unsupported";
- public static final String HOTSTANDBY_OR_COLDSTANDBY = "hotstandby_or_coldstandby";
-
- public PMStandbyStateChangeNotifier(){
- pdpUpdateInterval = Integer.parseInt(ActiveStandbyProperties.getProperty(ActiveStandbyProperties.PDP_UPDATE_INTERVAL));
- isWaitingForActivation = false;
- startTimeWaitingForActivationMs = new Date().getTime();
- //delay the activate so the DesignatedWaiter can run twice - give it an extra 2 seconds
- waitInterval = 2*pdpUpdateInterval + 2000L;
- isNowActivating=false;
- previousStandbyStatus = PMStandbyStateChangeNotifier.NONE;
- }
+ // get an instance of logger
+ private static final Logger logger = LoggerFactory.getLogger(PMStandbyStateChangeNotifier.class);
+ private Timer delayActivateTimer;
+ private int pdpUpdateInterval;
+ private boolean isWaitingForActivation;
+ private long startTimeWaitingForActivationMs;
+ private long waitInterval;
+ private boolean isNowActivating;
+ private String previousStandbyStatus;
+ public static final String NONE = "none";
+ public static final String UNSUPPORTED = "unsupported";
+ public static final String HOTSTANDBY_OR_COLDSTANDBY = "hotstandby_or_coldstandby";
+
+ public PMStandbyStateChangeNotifier() {
+ pdpUpdateInterval =
+ Integer.parseInt(ActiveStandbyProperties.getProperty(ActiveStandbyProperties.PDP_UPDATE_INTERVAL));
+ isWaitingForActivation = false;
+ startTimeWaitingForActivationMs = new Date().getTime();
+ // delay the activate so the DesignatedWaiter can run twice - give it an extra 2 seconds
+ waitInterval = 2 * pdpUpdateInterval + 2000L;
+ isNowActivating = false;
+ previousStandbyStatus = PMStandbyStateChangeNotifier.NONE;
+ }
+
+ @Override
+ public void handleStateChange() {
+ /*
+ * A note on synchronization: This method is not synchronized because the caller,
+ * stateManagememt, has synchronize all of its methods. Only one stateManagement operation
+ * can occur at a time. Thus, only one handleStateChange() call will ever be made at a time.
+ */
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: Entering, message={}, standbyStatus={}", super.getMessage(),
+ super.getStateManagement().getStandbyStatus());
+ }
+ String standbyStatus = super.getStateManagement().getStandbyStatus();
+ String pdpId = ActiveStandbyProperties.getProperty(ActiveStandbyProperties.NODE_NAME);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: previousStandbyStatus = {}" + "; standbyStatus = {}",
+ previousStandbyStatus, standbyStatus);
+ }
+
+ if (standbyStatus == null || standbyStatus.equals(StateManagement.NULL_VALUE)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: standbyStatus is null; standing down PDP={}", pdpId);
+ }
+ if (previousStandbyStatus.equals(StateManagement.NULL_VALUE)) {
+ // We were just here and did this successfully
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "handleStateChange: Is returning because standbyStatus is null and was previously 'null'; PDP={}",
+ pdpId);
+ }
+ return;
+ }
+ isWaitingForActivation = false;
+ try {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
+ }
+ delayActivateTimer.cancel();
+ } catch (Exception e) {
+ if (logger.isInfoEnabled()) {
+ logger.info("handleStateChange: null no delayActivationTimer existed.", e);
+ }
+ // If you end of here, there was no active timer
+ }
+ // Only want to lock the endpoints, not the controllers.
+ PolicyEngine.manager.deactivate();
+ // The operation was fully successful, but you cannot assign it a real null value
+ // because later we might try to execute previousStandbyStatus.equals() and get
+ // a null pointer exception.
+ previousStandbyStatus = StateManagement.NULL_VALUE;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
+ }
+ } else if (standbyStatus.equals(StateManagement.HOT_STANDBY)
+ || standbyStatus.equals(StateManagement.COLD_STANDBY)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
+ }
+ if (previousStandbyStatus.equals(PMStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)) {
+ // We were just here and did this successfully
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
+ }
+ return;
+ }
+ isWaitingForActivation = false;
+ try {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
+ }
+ delayActivateTimer.cancel();
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: HOT_STANDBY || COLD_STANDBY no delayActivationTimer existed.",
+ e);
+ }
+ // If you end of here, there was no active timer
+ }
+ // Only want to lock the endpoints, not the controllers.
+ PolicyEngine.manager.deactivate();
+ // The operation was fully successful
+ previousStandbyStatus = PMStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(),
+ e);
+ }
+
+ } else if (standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: standbyStatus= {} " + "scheduling activation of PDP={}", standbyStatus,
+ pdpId);
+ }
+ if (previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
+ // We were just here and did this successfully
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: Is returning because standbyStatus is {}"
+ + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
+ }
+ return;
+ }
+ try {
+ // UnLock all the endpoints
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.", standbyStatus);
+ }
+ /*
+ * Only endpoints should be unlocked. Controllers have not been locked. Because,
+ * sometimes, it is possible for more than one PDP-D to become active (race
+ * conditions) we need to delay the activation of the topic endpoint interfaces to
+ * give the election algorithm time to resolve the conflict.
+ */
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}",
+ isWaitingForActivation);
+ }
+
+ // Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler
+ // conflict.
+ // You could have multiple election handlers thinking they can take over.
+
+ // First let's check that the timer has not died
+ if (isWaitingForActivation) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}",
+ isWaitingForActivation);
+ }
+ long now = new Date().getTime();
+ long waitTimeMs = now - startTimeWaitingForActivationMs;
+ if (waitTimeMs > 3 * waitInterval) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "handleStateChange: PROVIDING_SERVICE looks like the activation wait timer may be hung,"
+ + " waitTimeMs = {} and allowable waitInterval = {}"
+ + " Checking whether it is currently in activation. isNowActivating = {}",
+ waitTimeMs, waitInterval, isNowActivating);
+ }
+ // Now check that it is not currently executing an activation
+ if (!isNowActivating) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "handleStateChange: PROVIDING_SERVICE looks like the activation wait timer died");
+ }
+ // This will assure the timer is cancelled and rescheduled.
+ isWaitingForActivation = false;
+ }
+ }
+
+ }
- @Override
- public void handleStateChange() {
- /*
- * A note on synchronization: This method is not synchronized because the caller, stateManagememt,
- * has synchronize all of its methods. Only one stateManagement operation can occur at a time. Thus,
- * only one handleStateChange() call will ever be made at a time.
- */
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Entering, message={}, standbyStatus={}",
- super.getMessage(), super.getStateManagement().getStandbyStatus());
- }
- String standbyStatus = super.getStateManagement().getStandbyStatus();
- String pdpId = ActiveStandbyProperties
- .getProperty(ActiveStandbyProperties.NODE_NAME);
+ if (!isWaitingForActivation) {
+ try {
+ // Just in case there is an old timer hanging around
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
+ }
+ delayActivateTimer.cancel();
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE no delayActivationTimer existed.", e);
+ }
+ // If you end of here, there was no active timer
+ }
+ delayActivateTimer = new Timer();
+ // delay the activate so the DesignatedWaiter can run twice
+ delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
+ isWaitingForActivation = true;
+ startTimeWaitingForActivationMs = new Date().getTime();
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms",
+ waitInterval);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "handleStateChange: PROVIDING_SERVICE delayActivationTimer is waiting for activation.");
+ }
+ }
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: previousStandbyStatus = {}"
- + "; standbyStatus = {}", previousStandbyStatus, standbyStatus);
- }
-
- if (standbyStatus == null || standbyStatus.equals(StateManagement.NULL_VALUE)) {
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: standbyStatus is null; standing down PDP={}", pdpId);
- }
- if(previousStandbyStatus.equals(StateManagement.NULL_VALUE)){
- //We were just here and did this successfully
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Is returning because standbyStatus is null and was previously 'null'; PDP={}", pdpId);
- }
- return;
- }
- isWaitingForActivation = false;
- try{
- try{
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: null: cancelling delayActivationTimer.");
- }
- delayActivateTimer.cancel();
- }catch(Exception e){
- if(logger.isInfoEnabled()){
- logger.info("handleStateChange: null no delayActivationTimer existed.", e);
- }
- //If you end of here, there was no active timer
- }
- //Only want to lock the endpoints, not the controllers.
- PolicyEngine.manager.deactivate();
- //The operation was fully successful, but you cannot assign it a real null value
- //because later we might try to execute previousStandbyStatus.equals() and get
- //a null pointer exception.
- previousStandbyStatus = StateManagement.NULL_VALUE;
- }catch(Exception e){
- logger.warn("handleStateChange: standbyStatus == null caught exception: ", e);
- }
- } else if (standbyStatus.equals(StateManagement.HOT_STANDBY) || standbyStatus.equals(StateManagement.COLD_STANDBY)) {
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- }
- if(previousStandbyStatus.equals(PMStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY)){
- //We were just here and did this successfully
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + " and was previously {}; PDP= {}", standbyStatus, previousStandbyStatus, pdpId);
- }
- return;
- }
- isWaitingForActivation = false;
- try{
- try{
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: HOT_STNDBY || COLD_STANDBY: cancelling delayActivationTimer.");
- }
- delayActivateTimer.cancel();
- }catch(Exception e){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: HOT_STANDBY || COLD_STANDBY no delayActivationTimer existed.", e);
- }
- //If you end of here, there was no active timer
- }
- //Only want to lock the endpoints, not the controllers.
- PolicyEngine.manager.deactivate();
- //The operation was fully successful
- previousStandbyStatus = PMStandbyStateChangeNotifier.HOTSTANDBY_OR_COLDSTANDBY;
- }catch(Exception e){
- logger.warn("handleStateChange: standbyStatus = {} caught exception: {}", standbyStatus, e.getMessage(), e);
- }
+ } catch (Exception e) {
+ logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ",
+ e);
+ }
- } else if (standbyStatus.equals(StateManagement.PROVIDING_SERVICE)) {
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: standbyStatus= {} "
- + "scheduling activation of PDP={}",standbyStatus, pdpId);
- }
- if(previousStandbyStatus.equals(StateManagement.PROVIDING_SERVICE)){
- //We were just here and did this successfully
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Is returning because standbyStatus is {}"
- + "and was previously {}; PDP={}", standbyStatus, previousStandbyStatus, pdpId);
- }
- return;
- }
- try{
- //UnLock all the endpoints
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: standbyStatus={}; controllers must be unlocked.",standbyStatus );
- }
- /*
- * Only endpoints should be unlocked. Controllers have not been locked.
- * Because, sometimes, it is possible for more than one PDP-D to become active (race conditions)
- * we need to delay the activation of the topic endpoint interfaces to give the election algorithm
- * time to resolve the conflict.
- */
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation= {}", isWaitingForActivation);
- }
-
- //Delay activation for 2*pdpUpdateInterval+2000 ms in case of an election handler conflict.
- //You could have multiple election handlers thinking they can take over.
-
- // First let's check that the timer has not died
- if(isWaitingForActivation){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE isWaitingForActivation = {}", isWaitingForActivation);
- }
- long now = new Date().getTime();
- long waitTimeMs = now - startTimeWaitingForActivationMs;
- if(waitTimeMs > 3*waitInterval){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer may be hung,"
- + " waitTimeMs = {} and allowable waitInterval = {}"
- + " Checking whether it is currently in activation. isNowActivating = {}",
- waitTimeMs, waitInterval, isNowActivating);
- }
- //Now check that it is not currently executing an activation
- if(!isNowActivating){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE looks like the activation wait timer died");
- }
- // This will assure the timer is cancelled and rescheduled.
- isWaitingForActivation = false;
- }
- }
-
- }
-
- if(!isWaitingForActivation){
- try{
- //Just in case there is an old timer hanging around
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE cancelling delayActivationTimer.");
- }
- delayActivateTimer.cancel();
- }catch(Exception e){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE no delayActivationTimer existed.", e);
- }
- //If you end of here, there was no active timer
- }
- delayActivateTimer = new Timer();
- //delay the activate so the DesignatedWaiter can run twice
- delayActivateTimer.schedule(new DelayActivateClass(), waitInterval);
- isWaitingForActivation = true;
- startTimeWaitingForActivationMs = new Date().getTime();
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE scheduling delayActivationTimer in {} ms", waitInterval);
- }
- }else{
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: PROVIDING_SERVICE delayActivationTimer is waiting for activation.");
- }
- }
-
- }catch(Exception e){
- logger.warn("handleStateChange: PROVIDING_SERVICE standbyStatus == providingservice caught exception: ", e);
- }
+ } else {
+ logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
+ if (previousStandbyStatus.equals(PMStandbyStateChangeNotifier.UNSUPPORTED)) {
+ // We were just here and did this successfully
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: Is returning because standbyStatus is "
+ + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
+ }
+ return;
+ }
+ // Only want to lock the endpoints, not the controllers.
+ isWaitingForActivation = false;
+ try {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
+ }
+ delayActivateTimer.cancel();
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: unsupported standbystatus: no delayActivationTimer existed.",
+ e);
+ }
+ // If you end of here, there was no active timer
+ }
+ PolicyEngine.manager.deactivate();
+ // We know the standbystatus is unsupported
+ previousStandbyStatus = PMStandbyStateChangeNotifier.UNSUPPORTED;
+ } catch (Exception e) {
+ logger.warn("handleStateChange: Unsupported standbyStatus = {} " + "caught exception: {} ",
+ standbyStatus, e.getMessage(), e);
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("handleStateChange: Exiting");
+ }
+ }
- } else {
- logger.error("handleStateChange: Unsupported standbyStatus={}; standing down PDP={}", standbyStatus, pdpId);
- if(previousStandbyStatus.equals(PMStandbyStateChangeNotifier.UNSUPPORTED)){
- //We were just here and did this successfully
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Is returning because standbyStatus is "
- + "UNSUPPORTED and was previously {}; PDP={}", previousStandbyStatus, pdpId);
- }
- return;
- }
- //Only want to lock the endpoints, not the controllers.
- isWaitingForActivation = false;
- try{
- try{
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: unsupported standbystatus: cancelling delayActivationTimer.");
- }
- delayActivateTimer.cancel();
- }catch(Exception e){
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: unsupported standbystatus: no delayActivationTimer existed.", e);
- }
- //If you end of here, there was no active timer
- }
- PolicyEngine.manager.deactivate();
- //We know the standbystatus is unsupported
- previousStandbyStatus = PMStandbyStateChangeNotifier.UNSUPPORTED;
- }catch(Exception e){
- logger.warn("handleStateChange: Unsupported standbyStatus = {} "
- + "caught exception: {} ",standbyStatus, e.getMessage(), e);
- }
- }
- if(logger.isDebugEnabled()){
- logger.debug("handleStateChange: Exiting");
- }
- }
+ private class DelayActivateClass extends TimerTask {
- private class DelayActivateClass extends TimerTask{
+ private Object delayActivateLock = new Object();
- private Object delayActivateLock = new Object();
+ @Override
+ public void run() {
+ isNowActivating = true;
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("DelayActivateClass.run: entry");
+ }
+ synchronized (delayActivateLock) {
+ PolicyEngine.manager.activate();
+ // The state change fully succeeded
+ previousStandbyStatus = StateManagement.PROVIDING_SERVICE;
+ // We want to set this to false here because the activate call can take a while
+ isWaitingForActivation = false;
+ isNowActivating = false;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("DelayActivateClass.run.exit");
+ }
+ } catch (Exception e) {
+ isWaitingForActivation = false;
+ isNowActivating = false;
+ logger.warn("DelayActivateClass.run: caught an unexpected exception "
+ + "calling PolicyEngine.manager.activate: ", e);
+ }
+ }
+ }
- @Override
- public void run() {
- isNowActivating = true;
- try{
- if(logger.isDebugEnabled()){
- logger.debug("DelayActivateClass.run: entry");
- }
- synchronized(delayActivateLock){
- PolicyEngine.manager.activate();
- // The state change fully succeeded
- previousStandbyStatus = StateManagement.PROVIDING_SERVICE;
- // We want to set this to false here because the activate call can take a while
- isWaitingForActivation = false;
- isNowActivating = false;
- }
- if(logger.isDebugEnabled()){
- logger.debug("DelayActivateClass.run.exit");
- }
- }catch(Exception e){
- isWaitingForActivation = false;
- isNowActivating = false;
- logger.warn("DelayActivateClass.run: caught an unexpected exception "
- + "calling PolicyEngine.manager.activate: ", e);
- }
- }
- }
-
- public String getPreviousStandbyStatus(){
- return previousStandbyStatus;
- }
+ public String getPreviousStandbyStatus() {
+ return previousStandbyStatus;
+ }
}
diff --git a/feature-healthcheck/src/main/java/org/onap/policy/drools/healthcheck/HealthCheck.java b/feature-healthcheck/src/main/java/org/onap/policy/drools/healthcheck/HealthCheck.java
index 9c543412..f4cb1cc6 100644
--- a/feature-healthcheck/src/main/java/org/onap/policy/drools/healthcheck/HealthCheck.java
+++ b/feature-healthcheck/src/main/java/org/onap/policy/drools/healthcheck/HealthCheck.java
@@ -28,9 +28,7 @@ import javax.ws.rs.core.Response;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.http.client.HttpClient;
-import org.onap.policy.common.endpoints.http.client.impl.IndexedHttpClientFactory;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
-import org.onap.policy.common.endpoints.http.server.impl.IndexedHttpServletServerFactory;
import org.onap.policy.drools.persistence.SystemPersistence;
import org.onap.policy.drools.system.PolicyEngine;
import org.slf4j.Logger;
@@ -252,8 +250,8 @@ class HealthCheckMonitor implements HealthCheck {
try {
this.healthCheckProperties =
SystemPersistence.manager.getProperties(HealthCheckFeature.CONFIGURATION_PROPERTIES_NAME);
- this.servers = IndexedHttpServletServerFactory.getInstance().build(healthCheckProperties);
- this.clients = IndexedHttpClientFactory.getInstance().build(healthCheckProperties);
+ this.servers = HttpServletServer.factory.build(healthCheckProperties);
+ this.clients = HttpClient.factory.build(healthCheckProperties);
for (HttpServletServer server : servers) {
startServer(server);
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
index 8780eefc..278e7fdc 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/DmaapManager.java
@@ -27,7 +27,6 @@ import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -263,14 +262,14 @@ public class DmaapManager {
* @return the topic sources
*/
public List<TopicSource> getTopicSources() {
- return ProxyTopicEndpointManager.getInstance().getTopicSources();
+ return TopicEndpoint.manager.getTopicSources();
}
/**
* @return the topic sinks
*/
public List<TopicSink> getTopicSinks() {
- return ProxyTopicEndpointManager.getInstance().getTopicSinks();
+ return TopicEndpoint.manager.getTopicSinks();
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
index 815dc548..ad6a1c56 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingFeature.java
@@ -27,9 +27,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.utils.properties.SpecProperties;
import org.onap.policy.common.utils.properties.exception.PropertyException;
import org.onap.policy.drools.controller.DroolsController;
@@ -432,7 +432,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sources
*/
public List<TopicSource> initTopicSources(Properties props) {
- return ProxyTopicEndpointManager.getInstance().addTopicSources(props);
+ return TopicEndpoint.manager.addTopicSources(props);
}
/**
@@ -442,7 +442,7 @@ public class PoolingFeature implements PolicyEngineFeatureAPI, PolicyControllerF
* @return the topic sinks
*/
public List<TopicSink> initTopicSinks(Properties props) {
- return ProxyTopicEndpointManager.getInstance().addTopicSinks(props);
+ return TopicEndpoint.manager.addTopicSinks(props);
}
}
}
diff --git a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
index f25f3d3d..70bacb1b 100644
--- a/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
+++ b/feature-pooling-dmaap/src/test/java/org/onap/policy/drools/pooling/FeatureTest2.java
@@ -53,10 +53,10 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.system.PolicyController;
@@ -142,10 +142,10 @@ public class FeatureTest2 {
saveManagerFactory = PoolingManagerImpl.getFactory();
saveDmaapFactory = DmaapManager.getFactory();
- externalSink = ProxyTopicEndpointManager.getInstance().addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
+ externalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(EXTERNAL_TOPIC)).get(0);
externalSink.start();
- internalSink = ProxyTopicEndpointManager.getInstance().addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
+ internalSink = TopicEndpoint.manager.addTopicSinks(makeSinkProperties(INTERNAL_TOPIC)).get(0);
internalSink.start();
}
@@ -223,9 +223,8 @@ public class FeatureTest2 {
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, "0");
- props.setProperty(
- PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX,
- "false");
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
return props;
}
@@ -239,9 +238,8 @@ public class FeatureTest2 {
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX, UEB_SERVERS);
props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, FETCH_LIMIT);
- props.setProperty(
- PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX,
- "false");
+ props.setProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
+ + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "false");
if (EXTERNAL_TOPIC.equals(topic)) {
// consumer group is a constant
@@ -467,10 +465,8 @@ public class FeatureTest2 {
when(controller.getName()).thenReturn(CONTROLLER1);
when(controller.getDrools()).thenReturn(drools);
- externalSource = ProxyTopicEndpointManager.getInstance()
- .addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
- internalSource = ProxyTopicEndpointManager.getInstance()
- .addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
+ externalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(EXTERNAL_TOPIC)).get(0);
+ internalSource = TopicEndpoint.manager.addTopicSources(makeSourceProperties(INTERNAL_TOPIC)).get(0);
// stop consuming events if the controller stops
when(controller.stop()).thenAnswer(args -> {
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
index 1ba2fc44..1a19284b 100644
--- a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
+++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/DMaaPSimulatorTest.java
@@ -37,7 +37,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
-import org.onap.policy.common.endpoints.http.server.impl.IndexedHttpServletServerFactory;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.drools.utils.logging.LoggerUtil;
@@ -50,8 +49,8 @@ public class DMaaPSimulatorTest {
LoggerUtil.setLevel("ROOT", "INFO");
LoggerUtil.setLevel("org.eclipse.jetty", "WARN");
try {
- final HttpServletServer testServer = IndexedHttpServletServerFactory.getInstance().build("dmaapSim",
- "localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
+ final HttpServletServer testServer =
+ HttpServletServer.factory.build("dmaapSim", "localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
testServer.addServletClass("/*", DMaaPSimulatorJaxRs.class.getName());
testServer.waitedStart(5000);
if (!NetworkUtil.isTcpPortOpen("localhost", testServer.getPort(), 5, 10000L)) {
@@ -64,7 +63,7 @@ public class DMaaPSimulatorTest {
@AfterClass
public static void tearDownSimulator() {
- IndexedHttpServletServerFactory.getInstance().destroy();
+ HttpServletServer.factory.destroy();
}
@Test
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java
index 8fdd3a4d..a7606eb2 100644
--- a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java
@@ -26,7 +26,6 @@ import java.util.Properties;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
-import org.onap.policy.common.endpoints.http.server.impl.IndexedHttpServletServerFactory;
import org.onap.policy.common.im.IntegrityMonitor;
import org.onap.policy.common.im.IntegrityMonitorException;
import org.onap.policy.drools.utils.PropertyUtil;
@@ -372,8 +371,7 @@ public class DroolsPDPIntegrityMonitor extends IntegrityMonitor {
@Override
public boolean start() {
try {
- List<HttpServletServer> servers =
- IndexedHttpServletServerFactory.getInstance().build(integrityMonitorRestServerProperties);
+ List<HttpServletServer> servers = HttpServletServer.factory.build(integrityMonitorRestServerProperties);
if (!servers.isEmpty()) {
server = servers.get(0);
diff --git a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
index 26bed5e4..00da884e 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
@@ -53,7 +53,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
@@ -1451,7 +1450,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the managed topics", notes = "Network Topics Aggregation",
response = TopicEndpoint.class)
public Response topics() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance()).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager).build();
}
@GET
@@ -1469,9 +1468,9 @@ public class RestManager {
@ApiResponses(value = {@ApiResponse(code = 406,
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response topicsLock() {
- final boolean success = ProxyTopicEndpointManager.getInstance().lock();
+ final boolean success = TopicEndpoint.manager.lock();
if (success) {
- return Response.status(Status.OK).entity(ProxyTopicEndpointManager.getInstance()).build();
+ return Response.status(Status.OK).entity(TopicEndpoint.manager).build();
} else {
return Response.status(Status.NOT_ACCEPTABLE).entity(new Error("cannot perform operation")).build();
}
@@ -1484,9 +1483,9 @@ public class RestManager {
@ApiResponses(value = {@ApiResponse(code = 406,
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response topicsUnlock() {
- final boolean success = ProxyTopicEndpointManager.getInstance().unlock();
+ final boolean success = TopicEndpoint.manager.unlock();
if (success) {
- return Response.status(Status.OK).entity(ProxyTopicEndpointManager.getInstance()).build();
+ return Response.status(Status.OK).entity(TopicEndpoint.manager).build();
} else {
return Response.status(Status.NOT_ACCEPTABLE).entity(new Error("cannot perform operation")).build();
}
@@ -1497,8 +1496,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the managed topic sources", notes = "Network Topic Sources Agregation",
responseContainer = "List", response = TopicSource.class)
public Response sources() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getTopicSources())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getTopicSources()).build();
}
@GET
@@ -1506,8 +1504,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the managed topic sinks", notes = "Network Topic Sinks Agregation",
responseContainer = "List", response = TopicSink.class)
public Response sinks() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getTopicSinks())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getTopicSinks()).build();
}
@GET
@@ -1515,8 +1512,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the UEB managed topic sources", notes = "UEB Topic Sources Agregation",
responseContainer = "List", response = UebTopicSource.class)
public Response uebSources() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getUebTopicSources())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSources()).build();
}
@GET
@@ -1524,8 +1520,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the UEB managed topic sinks", notes = "UEB Topic Sinks Agregation",
responseContainer = "List", response = UebTopicSink.class)
public Response uebSinks() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getUebTopicSinks())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSinks()).build();
}
@GET
@@ -1533,8 +1528,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the DMaaP managed topic sources", notes = "DMaaP Topic Sources Agregation",
responseContainer = "List", response = DmaapTopicSource.class)
public Response dmaapSources() {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getDmaapTopicSources()).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getDmaapTopicSources()).build();
}
@GET
@@ -1542,8 +1536,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the DMaaP managed topic sinks", notes = "DMaaP Topic Sinks Agregation",
responseContainer = "List", response = DmaapTopicSink.class)
public Response dmaapSinks() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getDmaapTopicSinks())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getDmaapTopicSinks()).build();
}
@GET
@@ -1552,8 +1545,7 @@ public class RestManager {
notes = "This is an UEB Network Communicaton Endpoint source of messages for the Engine",
response = UebTopicSource.class)
public Response uebSourceTopic(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic)).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSource(topic)).build();
}
@GET
@@ -1562,8 +1554,7 @@ public class RestManager {
notes = "This is an UEB Network Communicaton Endpoint destination of messages from the Engine",
response = UebTopicSink.class)
public Response uebSinkTopic(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getUebTopicSink(topic)).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSink(topic)).build();
}
@GET
@@ -1573,8 +1564,7 @@ public class RestManager {
response = DmaapTopicSource.class)
public Response dmaapSourceTopic(
@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic)).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getDmaapTopicSource(topic)).build();
}
@GET
@@ -1583,8 +1573,7 @@ public class RestManager {
notes = "This is a DMaaP Network Communicaton Endpoint destination of messages from the Engine",
response = DmaapTopicSink.class)
public Response dmaapSinkTopic(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getDmaapTopicSink(topic)).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getDmaapTopicSink(topic)).build();
}
@GET
@@ -1594,9 +1583,7 @@ public class RestManager {
responseContainer = "List")
public Response uebSourceEvents(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
return Response.status(Status.OK)
- .entity(Arrays
- .asList(ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic).getRecentEvents()))
- .build();
+ .entity(Arrays.asList(TopicEndpoint.manager.getUebTopicSource(topic).getRecentEvents())).build();
}
@GET
@@ -1606,8 +1593,7 @@ public class RestManager {
responseContainer = "List")
public Response uebSinkEvents(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
return Response.status(Status.OK)
- .entity(Arrays.asList(ProxyTopicEndpointManager.getInstance().getUebTopicSink(topic).getRecentEvents()))
- .build();
+ .entity(Arrays.asList(TopicEndpoint.manager.getUebTopicSink(topic).getRecentEvents())).build();
}
@GET
@@ -1618,9 +1604,7 @@ public class RestManager {
public Response dmaapSourceEvents(
@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
return Response.status(Status.OK)
- .entity(Arrays
- .asList(ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic).getRecentEvents()))
- .build();
+ .entity(Arrays.asList(TopicEndpoint.manager.getDmaapTopicSource(topic).getRecentEvents())).build();
}
@GET
@@ -1630,9 +1614,7 @@ public class RestManager {
responseContainer = "List")
public Response dmaapSinkEvents(@PathParam("topic") String topic) {
return Response.status(Status.OK)
- .entity(Arrays
- .asList(ProxyTopicEndpointManager.getInstance().getDmaapTopicSink(topic).getRecentEvents()))
- .build();
+ .entity(Arrays.asList(TopicEndpoint.manager.getDmaapTopicSink(topic).getRecentEvents())).build();
}
@GET
@@ -1640,8 +1622,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves the NOOP managed topic sinks", notes = "NOOP Topic Sinks Agregation",
responseContainer = "List", response = NoopTopicSink.class)
public Response noopSinks() {
- return Response.status(Response.Status.OK).entity(ProxyTopicEndpointManager.getInstance().getNoopTopicSinks())
- .build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSinks()).build();
}
@GET
@@ -1649,8 +1630,7 @@ public class RestManager {
@ApiOperation(value = "Retrieves a NOOP managed topic sink",
notes = "NOOP is an dev/null Network Communicaton Sink", response = NoopTopicSink.class)
public Response noopSinkTopic(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- return Response.status(Response.Status.OK)
- .entity(ProxyTopicEndpointManager.getInstance().getNoopTopicSink(topic)).build();
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSink(topic)).build();
}
@GET
@@ -1659,9 +1639,7 @@ public class RestManager {
notes = "NOOP is an dev/null Network Communicaton Sink", responseContainer = "List")
public Response noopSinkEvents(@PathParam("topic") String topic) {
return Response.status(Status.OK)
- .entity(Arrays
- .asList(ProxyTopicEndpointManager.getInstance().getNoopTopicSink(topic).getRecentEvents()))
- .build();
+ .entity(Arrays.asList(TopicEndpoint.manager.getNoopTopicSink(topic).getRecentEvents())).build();
}
@GET
@@ -1678,7 +1656,7 @@ public class RestManager {
@ApiResponses(value = {@ApiResponse(code = 406,
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response uebTopicLock(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- final UebTopicSource source = ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic);
+ final UebTopicSource source = TopicEndpoint.manager.getUebTopicSource(topic);
final boolean success = source.lock();
if (success) {
return Response.status(Status.OK).entity(source).build();
@@ -1693,7 +1671,7 @@ public class RestManager {
@ApiResponses(value = {@ApiResponse(code = 406,
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response uebTopicUnlock(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- final UebTopicSource source = ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic);
+ final UebTopicSource source = TopicEndpoint.manager.getUebTopicSource(topic);
final boolean success = source.unlock();
if (success) {
return Response.status(Status.OK).entity(source).build();
@@ -1720,7 +1698,7 @@ public class RestManager {
@ApiResponses(value = {@ApiResponse(code = 406,
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response dmmapTopicLock(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- final DmaapTopicSource source = ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic);
+ final DmaapTopicSource source = TopicEndpoint.manager.getDmaapTopicSource(topic);
final boolean success = source.lock();
if (success) {
return Response.status(Status.OK).entity(source).build();
@@ -1736,7 +1714,7 @@ public class RestManager {
message = "The system is an administrative state that prevents " + "this request to be fulfilled")})
public Response dmaapTopicUnlock(
@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
- final DmaapTopicSource source = ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic);
+ final DmaapTopicSource source = TopicEndpoint.manager.getDmaapTopicSource(topic);
final boolean success = source.unlock();
if (success) {
return Response.status(Status.OK).entity(source).build();
@@ -1757,12 +1735,11 @@ public class RestManager {
public Response uebOffer(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic,
@ApiParam(value = "Network Message", required = true) String json) {
try {
- final UebTopicSource uebReader = ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic);
+ final UebTopicSource uebReader = TopicEndpoint.manager.getUebTopicSource(topic);
final boolean success = uebReader.offer(json);
if (success) {
return Response.status(Status.OK)
- .entity(Arrays.asList(
- ProxyTopicEndpointManager.getInstance().getUebTopicSource(topic).getRecentEvents()))
+ .entity(Arrays.asList(TopicEndpoint.manager.getUebTopicSource(topic).getRecentEvents()))
.build();
} else {
return Response.status(Status.NOT_ACCEPTABLE).entity(new Error("Failure to inject event over " + topic))
@@ -1797,12 +1774,11 @@ public class RestManager {
public Response dmaapOffer(@ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic,
@ApiParam(value = "Network Message", required = true) String json) {
try {
- final DmaapTopicSource dmaapReader = ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic);
+ final DmaapTopicSource dmaapReader = TopicEndpoint.manager.getDmaapTopicSource(topic);
final boolean success = dmaapReader.offer(json);
if (success) {
return Response.status(Status.OK)
- .entity(Arrays.asList(
- ProxyTopicEndpointManager.getInstance().getDmaapTopicSource(topic).getRecentEvents()))
+ .entity(Arrays.asList(TopicEndpoint.manager.getDmaapTopicSource(topic).getRecentEvents()))
.build();
} else {
return Response.status(Status.NOT_ACCEPTABLE).entity(new Error("Failure to inject event over " + topic))
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
index 71c509c2..4be85022 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
@@ -33,12 +33,11 @@ import org.onap.policy.common.capabilities.Lockable;
import org.onap.policy.common.capabilities.Startable;
import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.endpoints.http.server.HttpServletServer;
-import org.onap.policy.common.endpoints.http.server.impl.IndexedHttpServletServerFactory;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.core.PolicyContainer;
@@ -499,7 +498,7 @@ class PolicyEngineManager implements PolicyEngine {
this.properties = properties;
try {
- this.sources = ProxyTopicEndpointManager.getInstance().addTopicSources(properties);
+ this.sources = TopicEndpoint.manager.addTopicSources(properties);
for (final TopicSource source : this.sources) {
source.register(this);
}
@@ -508,13 +507,13 @@ class PolicyEngineManager implements PolicyEngine {
}
try {
- this.sinks = ProxyTopicEndpointManager.getInstance().addTopicSinks(properties);
+ this.sinks = TopicEndpoint.manager.addTopicSinks(properties);
} catch (final IllegalArgumentException e) {
logger.error("{}: add-sinks failed", this, e);
}
try {
- this.httpServers = IndexedHttpServletServerFactory.getInstance().build(properties);
+ this.httpServers = HttpServletServer.factory.build(properties);
} catch (final IllegalArgumentException e) {
logger.error("{}: add-http-servers failed", this, e);
}
@@ -817,7 +816,7 @@ class PolicyEngineManager implements PolicyEngine {
/* Start managed Topic Endpoints */
try {
- if (!ProxyTopicEndpointManager.getInstance().start()) {
+ if (!TopicEndpoint.manager.start()) {
success = false;
}
} catch (final IllegalStateException e) {
@@ -903,7 +902,7 @@ class PolicyEngineManager implements PolicyEngine {
}
/* stop all managed topics sources and sinks */
- if (!ProxyTopicEndpointManager.getInstance().stop()) {
+ if (!TopicEndpoint.manager.stop()) {
success = false;
}
@@ -1009,8 +1008,8 @@ class PolicyEngineManager implements PolicyEngine {
/* Shutdown managed resources */
PolicyController.factory.shutdown();
- ProxyTopicEndpointManager.getInstance().shutdown();
- IndexedHttpServletServerFactory.getInstance().destroy();
+ TopicEndpoint.manager.shutdown();
+ HttpServletServer.factory.destroy();
// Stop the JMX listener
@@ -1069,7 +1068,7 @@ class PolicyEngineManager implements PolicyEngine {
}
}
- success = ProxyTopicEndpointManager.getInstance().lock() && success;
+ success = TopicEndpoint.manager.lock() && success;
/* policy-engine dispatch post lock hook */
for (final PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
@@ -1119,7 +1118,7 @@ class PolicyEngineManager implements PolicyEngine {
}
}
- success = ProxyTopicEndpointManager.getInstance().unlock() && success;
+ success = TopicEndpoint.manager.unlock() && success;
/* policy-engine dispatch after unlock hook */
for (final PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
@@ -1255,7 +1254,7 @@ class PolicyEngineManager implements PolicyEngine {
throw new IllegalStateException(ENGINE_LOCKED_MSG);
}
- final List<? extends TopicSink> topicSinks = ProxyTopicEndpointManager.getInstance().getTopicSinks(topic);
+ final List<? extends TopicSink> topicSinks = TopicEndpoint.manager.getTopicSinks(topic);
if (topicSinks == null || topicSinks.isEmpty() || topicSinks.size() > 1) {
throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
}
@@ -1376,7 +1375,7 @@ class PolicyEngineManager implements PolicyEngine {
}
try {
- final TopicSink sink = ProxyTopicEndpointManager.getInstance().getTopicSink(busType, topic);
+ final TopicSink sink = TopicEndpoint.manager.getTopicSink(busType, topic);
if (sink == null) {
throw new IllegalStateException("Inconsistent State: " + this);
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
index 41408258..5172fc3d 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/internal/AggregatedPolicyController.java
@@ -27,10 +27,10 @@ import java.util.List;
import java.util.Properties;
import org.onap.policy.common.endpoints.event.comm.Topic;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicListener;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
import org.onap.policy.drools.persistence.SystemPersistence;
@@ -115,8 +115,8 @@ public class AggregatedPolicyController implements PolicyController, TopicListen
// Create/Reuse Readers/Writers for all event sources endpoints
- this.sources = ProxyTopicEndpointManager.getInstance().addTopicSources(properties);
- this.sinks = ProxyTopicEndpointManager.getInstance().addTopicSinks(properties);
+ this.sources = TopicEndpoint.manager.addTopicSources(properties);
+ this.sinks = TopicEndpoint.manager.addTopicSinks(properties);
initDrools(properties);
initSinks();
diff --git a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/EventProtocolCoderTest.java b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/EventProtocolCoderTest.java
index a23820cf..b42c64b1 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/EventProtocolCoderTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/EventProtocolCoderTest.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Properties;
import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
@@ -82,7 +82,7 @@ public class EventProtocolCoderTest {
final Properties noopSinkProperties = new Properties();
noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
- ProxyTopicEndpointManager.getInstance().addTopicSinks(noopSinkProperties);
+ TopicEndpoint.manager.addTopicSinks(noopSinkProperties);
EventProtocolCoder.manager.addEncoder(ENCODER_GROUP, ENCODER_ARTIFACT, NOOP_TOPIC,
DroolsConfiguration.class.getCanonicalName(), new JsonProtocolFilter(), null, null,
diff --git a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
index 02b09bb1..aa04f407 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/protocol/coders/ProtocolCoderToolsetTest.java
@@ -36,8 +36,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.kie.api.builder.ReleaseId;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.controller.internal.MavenDroolsControllerTest;
@@ -242,7 +242,7 @@ public class ProtocolCoderToolsetTest {
Properties sinkConfig = new Properties();
sinkConfig.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, JUNIT_PROTOCOL_CODER_TOPIC);
- List<? extends TopicSink> noopTopics = ProxyTopicEndpointManager.getInstance().addTopicSinks(sinkConfig);
+ List<? extends TopicSink> noopTopics = TopicEndpoint.manager.addTopicSinks(sinkConfig);
Properties droolsControllerConfig = new Properties();
droolsControllerConfig.put(DroolsProperties.RULES_GROUPID, releaseId.getGroupId());
diff --git a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
index 0f57cd97..e96a4b91 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/server/restful/test/RestManagerTest.java
@@ -43,7 +43,7 @@ import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.persistence.SystemPersistence;
import org.onap.policy.drools.system.PolicyController;
@@ -65,10 +65,10 @@ public class RestManagerTest {
private static final String UEB_SOURCE_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
+ UEB_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
- private static final String UEB_SINK_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + UEB_TOPIC
- + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
- private static final String DMAAP_SOURCE_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
- + DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+ private static final String UEB_SINK_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
+ + UEB_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
+ private static final String DMAAP_SOURCE_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+ + "." + DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
private static final String DMAAP_SINK_SERVER_PROPERTY = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX;
private static final String UEB_SERVER = "localhost";
@@ -81,10 +81,10 @@ public class RestManagerTest {
private static final String DMAAP_SOURCE_PASSWD_KEY = PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+ DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX;
- private static final String DMAAP_SINK_MECHID_KEY = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + DMAAP_TOPIC
- + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX;
- private static final String DMAAP_SINK_PASSWD_KEY = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + DMAAP_TOPIC
- + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX;
+ private static final String DMAAP_SINK_MECHID_KEY = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX;
+ private static final String DMAAP_SINK_PASSWD_KEY = PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
+ + DMAAP_TOPIC + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX;
private static final String FOO_CONTROLLER_FILE = FOO_CONTROLLER + "-controller.properties";
@@ -137,7 +137,7 @@ public class RestManagerTest {
public static void tearDown() throws IOException, InterruptedException {
/* Shutdown managed resources */
PolicyController.factory.shutdown();
- ProxyTopicEndpointManager.getInstance().shutdown();
+ TopicEndpoint.manager.shutdown();
PolicyEngine.manager.stop();
Thread.sleep(10000L);
client.close();
diff --git a/policy-management/src/test/java/org/onap/policy/drools/system/test/PolicyEngineTest.java b/policy-management/src/test/java/org/onap/policy/drools/system/test/PolicyEngineTest.java
index 88d25563..c6a4ffb8 100644
--- a/policy-management/src/test/java/org/onap/policy/drools/system/test/PolicyEngineTest.java
+++ b/policy-management/src/test/java/org/onap/policy/drools/system/test/PolicyEngineTest.java
@@ -34,9 +34,9 @@ import org.junit.BeforeClass;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.bus.impl.IndexedNoopTopicSinkFactory;
-import org.onap.policy.common.endpoints.event.comm.impl.ProxyTopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.onap.policy.drools.persistence.SystemPersistence;
import org.onap.policy.drools.properties.DroolsProperties;
@@ -198,7 +198,7 @@ public class PolicyEngineTest {
final Properties noopSinkProperties = new Properties();
noopSinkProperties.put(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS, NOOP_TOPIC);
- ProxyTopicEndpointManager.getInstance().addTopicSinks(noopSinkProperties).get(0).start();
+ TopicEndpoint.manager.addTopicSinks(noopSinkProperties).get(0).start();
EventProtocolCoder.manager.addEncoder(ENCODER_GROUP, ENCODER_ARTIFACT, NOOP_TOPIC,
DroolsConfiguration.class.getCanonicalName(), new JsonProtocolFilter(), null, null,
@@ -207,7 +207,7 @@ public class PolicyEngineTest {
assertTrue(PolicyEngine.manager.deliver(NOOP_TOPIC,
new DroolsConfiguration(ENCODER_GROUP, ENCODER_ARTIFACT, ENCODER_VERSION)));
- final TopicSink sink = IndexedNoopTopicSinkFactory.getInstance().get(NOOP_TOPIC);
+ final TopicSink sink = NoopTopicSink.factory.get(NOOP_TOPIC);
assertTrue(sink.getRecentEvents()[0].contains(ENCODER_GROUP));
assertTrue(sink.getRecentEvents()[0].contains(ENCODER_ARTIFACT));
@@ -280,7 +280,7 @@ public class PolicyEngineTest {
/* Shutdown managed resources */
PolicyController.factory.shutdown();
- ProxyTopicEndpointManager.getInstance().shutdown();
+ TopicEndpoint.manager.shutdown();
PolicyEngine.manager.stop();
Thread.sleep(10000L);