From e173fc5ab13b095d8f70fd8a8d4d063adeba6e6b Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Fri, 5 Jul 2019 10:52:20 -0400 Subject: Fix sonar issues in policy/endpoints Sonar fixes, other than code coverage. These changes are disruptive and will likely cause breakage in a number of policy repos. Renamed constants. Moved constants/factories from interfaces to classes. Change-Id: I182d50320aa6b53e383081af806c60dd2f806cbe Issue-ID: POLICY-1791 Signed-off-by: Jim Hahn --- .../common/endpoints/event/comm/TopicEndpoint.java | 5 -- .../endpoints/event/comm/TopicEndpointManager.java | 37 +++++++++++++ .../endpoints/event/comm/TopicEndpointProxy.java | 63 +++++++++++----------- .../endpoints/event/comm/bus/BusTopicSink.java | 14 ++--- .../endpoints/event/comm/bus/BusTopicSource.java | 34 +++--------- .../event/comm/bus/DmaapTopicFactories.java | 43 +++++++++++++++ .../endpoints/event/comm/bus/DmaapTopicSink.java | 11 ++-- .../event/comm/bus/DmaapTopicSinkFactory.java | 9 ---- .../endpoints/event/comm/bus/DmaapTopicSource.java | 10 ++-- .../event/comm/bus/DmaapTopicSourceFactory.java | 32 +++++------ .../comm/bus/IndexedDmaapTopicSourceFactory.java | 8 +-- .../comm/bus/IndexedUebTopicSourceFactory.java | 8 +-- .../event/comm/bus/NoopTopicFactories.java | 43 +++++++++++++++ .../endpoints/event/comm/bus/NoopTopicSink.java | 5 -- .../endpoints/event/comm/bus/NoopTopicSource.java | 5 -- .../event/comm/bus/UebTopicFactories.java | 43 +++++++++++++++ .../endpoints/event/comm/bus/UebTopicSink.java | 10 ++-- .../endpoints/event/comm/bus/UebTopicSource.java | 10 ++-- .../event/comm/bus/internal/BusConsumer.java | 43 ++++++++++----- .../event/comm/bus/internal/BusPublisher.java | 10 ++-- .../bus/internal/SingleThreadedBusTopicSource.java | 9 ++-- .../event/comm/client/TopicSinkClient.java | 4 +- 22 files changed, 284 insertions(+), 172 deletions(-) create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java create mode 100644 policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java index ff8b9513..7bc7abab 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java @@ -37,11 +37,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; */ public interface TopicEndpoint extends Startable, Lockable { - /** - * singleton for global access. - */ - TopicEndpoint manager = new TopicEndpointProxy(); - /** * Add topics configuration (sources and sinks) into a single list. * diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java new file mode 100644 index 00000000..c390afc6 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointManager.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm; + +import lombok.Getter; + +public class TopicEndpointManager { + + /** + * Topic endpoint manager. + */ + @Getter + private static TopicEndpoint manager = new TopicEndpointProxy(); + + + private TopicEndpointManager() { + // do nothing + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java index 806d077c..00980fc4 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpointProxy.java @@ -25,10 +25,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.onap.policy.common.capabilities.Startable; +import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicSource; +import org.onap.policy.common.endpoints.event.comm.bus.UebTopicFactories; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink; import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; @@ -71,9 +74,9 @@ class TopicEndpointProxy implements TopicEndpoint { List sources = new ArrayList<>(); - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - sources.addAll(NoopTopicSource.factory.build(properties)); + sources.addAll(UebTopicFactories.getSourceFactory().build(properties)); + sources.addAll(DmaapTopicFactories.getSourceFactory().build(properties)); + sources.addAll(NoopTopicFactories.getSourceFactory().build(properties)); if (this.isLocked()) { for (final TopicSource source : sources) { @@ -92,9 +95,9 @@ class TopicEndpointProxy implements TopicEndpoint { final List sinks = new ArrayList<>(); - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); + sinks.addAll(UebTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(DmaapTopicFactories.getSinkFactory().build(properties)); + sinks.addAll(NoopTopicFactories.getSinkFactory().build(properties)); if (this.isLocked()) { for (final TopicSink sink : sinks) { @@ -110,9 +113,9 @@ class TopicEndpointProxy implements TopicEndpoint { final List sources = new ArrayList<>(); - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - sources.addAll(NoopTopicSource.factory.inventory()); + sources.addAll(UebTopicFactories.getSourceFactory().inventory()); + sources.addAll(DmaapTopicFactories.getSourceFactory().inventory()); + sources.addAll(NoopTopicFactories.getSourceFactory().inventory()); return sources; } @@ -161,9 +164,9 @@ class TopicEndpointProxy implements TopicEndpoint { final List sinks = new ArrayList<>(); - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); + sinks.addAll(UebTopicFactories.getSinkFactory().inventory()); + sinks.addAll(DmaapTopicFactories.getSinkFactory().inventory()); + sinks.addAll(NoopTopicFactories.getSinkFactory().inventory()); return sinks; } @@ -240,42 +243,42 @@ class TopicEndpointProxy implements TopicEndpoint { @GsonJsonIgnore @Override public List getUebTopicSources() { - return UebTopicSource.factory.inventory(); + return UebTopicFactories.getSourceFactory().inventory(); } @JsonIgnore @GsonJsonIgnore @Override public List getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); + return DmaapTopicFactories.getSourceFactory().inventory(); } @JsonIgnore @GsonJsonIgnore @Override public List getNoopTopicSources() { - return NoopTopicSource.factory.inventory(); + return NoopTopicFactories.getSourceFactory().inventory(); } @JsonIgnore @GsonJsonIgnore @Override public List getUebTopicSinks() { - return UebTopicSink.factory.inventory(); + return UebTopicFactories.getSinkFactory().inventory(); } @JsonIgnore @GsonJsonIgnore @Override public List getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); + return DmaapTopicFactories.getSinkFactory().inventory(); } @JsonIgnore @GsonJsonIgnore @Override public List getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); + return NoopTopicFactories.getSinkFactory().inventory(); } @Override @@ -354,14 +357,14 @@ class TopicEndpointProxy implements TopicEndpoint { public void shutdown() { this.stop(); - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); + UebTopicFactories.getSourceFactory().destroy(); + UebTopicFactories.getSinkFactory().destroy(); - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); + DmaapTopicFactories.getSourceFactory().destroy(); + DmaapTopicFactories.getSinkFactory().destroy(); - NoopTopicSink.factory.destroy(); - NoopTopicSource.factory.destroy(); + NoopTopicFactories.getSinkFactory().destroy(); + NoopTopicFactories.getSourceFactory().destroy(); } @@ -465,32 +468,32 @@ class TopicEndpointProxy implements TopicEndpoint { @Override public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); + return UebTopicFactories.getSourceFactory().get(topicName); } @Override public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); + return UebTopicFactories.getSinkFactory().get(topicName); } @Override public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); + return DmaapTopicFactories.getSourceFactory().get(topicName); } @Override public NoopTopicSource getNoopTopicSource(String topicName) { - return NoopTopicSource.factory.get(topicName); + return NoopTopicFactories.getSourceFactory().get(topicName); } @Override public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); + return DmaapTopicFactories.getSinkFactory().get(topicName); } @Override public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); + return NoopTopicFactories.getSinkFactory().get(topicName); } private IllegalArgumentException parmException(String topicName) { diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java index ed796585..e77beea1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,21 +26,17 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink; * Topic Sink over Bus Infrastructure (DMAAP/UEB). */ public interface BusTopicSink extends ApiKeyEnabled, TopicSink { - /** - * Log Failures after X number of retries. - */ - public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1; /** * Sets the UEB partition key for published messages. - * + * * @param partitionKey the partition key */ public void setPartitionKey(String partitionKey); /** * Return the partition key in used by the system to publish messages. - * + * * @return the partition key */ public String getPartitionKey(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java index 5ab4d46f..cd9bc015 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSource.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,50 +28,30 @@ import org.onap.policy.common.endpoints.event.comm.TopicSource; */ public interface BusTopicSource extends ApiKeyEnabled, TopicSource { - /** - * Default Timeout fetching in milliseconds. - */ - public static int DEFAULT_TIMEOUT_MS_FETCH = 15000; - - /** - * Default maximum number of messages fetch at the time. - */ - public static int DEFAULT_LIMIT_FETCH = 100; - - /** - * Definition of No Timeout fetching. - */ - public static int NO_TIMEOUT_MS_FETCH = -1; - - /** - * Definition of No limit fetching. - */ - public static int NO_LIMIT_FETCH = -1; - /** * Gets the consumer group. - * + * * @return consumer group */ public String getConsumerGroup(); /** * Gets the consumer instance. - * + * * @return consumer instance */ public String getConsumerInstance(); /** * Gets the fetch timeout. - * + * * @return fetch timeout */ public int getFetchTimeout(); /** * Gets the fetch limit. - * + * * @return fetch limit */ public int getFetchLimit(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java new file mode 100644 index 00000000..d5a46f8f --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicFactories.java @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import lombok.Getter; + +public class DmaapTopicFactories { + + /** + * Factory for instantiation and management of sinks. + */ + @Getter + private static final DmaapTopicSinkFactory sinkFactory = new IndexedDmaapTopicSinkFactory(); + + /** + * Factory for instantiation and management of sources. + */ + @Getter + private static final DmaapTopicSourceFactory sourceFactory = new IndexedDmaapTopicSourceFactory(); + + + private DmaapTopicFactories() { + // do nothing + } +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java index fc1587e4..805ed108 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSink.java @@ -2,14 +2,14 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,9 +22,4 @@ package org.onap.policy.common.endpoints.event.comm.bus; public interface DmaapTopicSink extends BusTopicSink { - /** - * Factory of UebTopicWriter for instantiation and management purposes. - */ - - public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index a3eb4df6..4409e827 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -30,15 +30,6 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; */ public interface DmaapTopicSinkFactory { - String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; - String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; - String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; - String DME2_VERSION_PROPERTY = "Version"; - String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; - String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; - String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; - String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; - /** *
      * Instantiate a new DMAAP Topic Sink, with following params.
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
index 2cf07bda..9893fa15 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSource.java
@@ -2,14 +2,14 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,8 +22,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
 
 public interface DmaapTopicSource extends BusTopicSource {
 
-    /**
-     * factory for managing and tracking DMAAP sources.
-     */
-    public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory();
 }
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
index 35a79bf1..7b1f185b 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
@@ -8,9 +8,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,20 +29,12 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
  * DMAAP Topic Source Factory.
  */
 public interface DmaapTopicSourceFactory {
-    String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
-    String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
-    String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
-    String DME2_VERSION_PROPERTY = "Version";
-    String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
-    String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
-    String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
-    String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
 
     /**
      * Creates an DMAAP Topic Source based on properties files.
-     * 
+     *
      * @param properties Properties containing initialization values
-     * 
+     *
      * @return an DMAAP Topic Source
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -50,7 +42,7 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Instantiates a new DMAAP Topic Source.
-     * 
+     *
      * @param busTopicParams parameters object
      * @return a DMAAP Topic Source
      */
@@ -58,12 +50,12 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Instantiates a new DMAAP Topic Source.
-     * 
+     *
      * @param servers list of servers
      * @param topic topic name
      * @param apiKey API Key
      * @param apiSecret API Secret
-     * 
+     *
      * @return an DMAAP Topic Source
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -71,10 +63,10 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Instantiates a new DMAAP Topic Source.
-     * 
+     *
      * @param servers list of servers
      * @param topic topic name
-     * 
+     *
      * @return an DMAAP Topic Source
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -82,7 +74,7 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Destroys an DMAAP Topic Source based on a topic.
-     * 
+     *
      * @param topic topic name
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -95,7 +87,7 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Gets an DMAAP Topic Source based on topic name.
-     * 
+     *
      * @param topic the topic name
      * @return an DMAAP Topic Source with topic name
      * @throws IllegalArgumentException if an invalid topic is provided
@@ -105,7 +97,7 @@ public interface DmaapTopicSourceFactory {
 
     /**
      * Provides a snapshot of the DMAAP Topic Sources.
-     * 
+     *
      * @return a list of the DMAAP Topic Sources
      */
     List inventory();
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
index c895a409..d7f4695e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedDmaapTopicSourceFactory.java
@@ -96,8 +96,8 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
                 .topic(topic)
                 .apiKey(apiKey)
                 .apiSecret(apiSecret)
-                .fetchTimeout(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
-                .fetchLimit(DmaapTopicSource.DEFAULT_LIMIT_FETCH)
+                .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
+                .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
                 .managed(true)
                 .useHttps(false)
                 .allowSelfSignedCerts(false)
@@ -133,9 +133,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
                 .fetchTimeout(props.getInteger(
                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
-                                DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+                                PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
-                                DmaapTopicSource.DEFAULT_LIMIT_FETCH))
+                                PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
                 .build());
 
         dmaapTopicSourceLst.add(uebTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
index 6655aa12..5bdc8ab6 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedUebTopicSourceFactory.java
@@ -100,8 +100,8 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
                 .topic(topic)
                 .apiKey(apiKey)
                 .apiSecret(apiSecret)
-                .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
-                .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
+                .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
+                .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
                 .managed(true)
                 .useHttps(false)
                 .allowSelfSignedCerts(true).build());
@@ -136,9 +136,9 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
                 .fetchTimeout(props.getInteger(
                                 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
-                                UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH))
+                                PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
-                                UebTopicSource.DEFAULT_LIMIT_FETCH))
+                                PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
                 .build());
 
         newUebTopicSources.add(uebTopicSource);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java
new file mode 100644
index 00000000..aa85e714
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicFactories.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.Getter;
+
+public class NoopTopicFactories {
+
+    /**
+     * Factory for instantiation and management of sinks.
+     */
+    @Getter
+    private static final NoopTopicSinkFactory sinkFactory = new NoopTopicSinkFactory();
+
+    /**
+     * Factory for instantiation and management of sources.
+     */
+    @Getter
+    private static final NoopTopicSourceFactory sourceFactory = new NoopTopicSourceFactory();
+
+
+    private NoopTopicFactories() {
+        // do nothing
+    }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
index c52a30be..e7accad5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSink.java
@@ -29,11 +29,6 @@ import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
  */
 public class NoopTopicSink extends NoopTopicEndpoint implements TopicSink {
 
-    /**
-     * Factory.
-     */
-    public static final NoopTopicSinkFactory factory = new NoopTopicSinkFactory();
-
     /**
      * Constructs the object.
      */
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
index a5b9349e..6f2c4a1e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSource.java
@@ -29,11 +29,6 @@ import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
  */
 public class NoopTopicSource extends NoopTopicEndpoint implements TopicSource {
 
-    /**
-     * Factory.
-     */
-    public static final NoopTopicSourceFactory factory = new NoopTopicSourceFactory();
-
     /**
      * Constructs the object.
      */
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
new file mode 100644
index 00000000..d02758be
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicFactories.java
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import lombok.Getter;
+
+public class UebTopicFactories {
+
+    /**
+     * Factory for instantiation and management of sinks.
+     */
+    @Getter
+    private static final UebTopicSinkFactory sinkFactory = new IndexedUebTopicSinkFactory();
+
+    /**
+     * Factory for instantiation and management of sources.
+     */
+    @Getter
+    private static final UebTopicSourceFactory sourceFactory = new IndexedUebTopicSourceFactory();
+
+
+    private UebTopicFactories() {
+        // do nothing
+    }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
index bc1251d5..acfef6da 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSink.java
@@ -2,14 +2,14 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,8 +25,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
  */
 public interface UebTopicSink extends BusTopicSink {
 
-    /**
-     * Factory of UEB Topic Sinks for instantiation and management purposes.
-     */
-    public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
 }
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
index ee4f013b..56534309 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSource.java
@@ -2,14 +2,14 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,8 +26,4 @@ package org.onap.policy.common.endpoints.event.comm.bus;
  */
 public interface UebTopicSource extends BusTopicSource {
 
-    /**
-     * factory for managing and tracking UEB readers.
-     */
-    public static UebTopicSourceFactory factory = new IndexedUebTopicSourceFactory();
 }
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index abf793d6..b66b4ba5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -24,7 +24,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
 import com.att.nsa.cambria.client.CambriaClientBuilders;
 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
 import com.att.nsa.cambria.client.CambriaConsumer;
-
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.security.GeneralSecurityException;
@@ -38,7 +37,6 @@ import org.onap.dmaap.mr.client.MRClientFactory;
 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,9 +50,9 @@ public interface BusConsumer {
      * fetch messages.
      *
      * @return list of messages
-     * @throws Exception when error encountered by underlying libraries
+     * @throws IOException when error encountered by underlying libraries
      */
-    public Iterable fetch() throws InterruptedException, IOException;
+    public Iterable fetch() throws IOException;
 
     /**
      * close underlying library consumer.
@@ -168,16 +166,24 @@ public interface BusConsumer {
         }
 
         @Override
-        public Iterable fetch() throws IOException, InterruptedException {
+        public Iterable fetch() throws IOException {
             try {
                 return getCurrentConsumer().fetch();
             } catch (final IOException e) {
                 logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
-                        this.fetchTimeout, e);
+                        this.fetchTimeout);
+                sleepAfterFetchFailure();
+                throw e;
+            }
+        }
 
+        private void sleepAfterFetchFailure() {
+            try {
                 this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
 
-                throw e;
+            } catch (InterruptedException e) {
+                logger.warn("{}: interrupted while handling fetch error", this, e);
+                Thread.currentThread().interrupt();
             }
         }
 
@@ -306,12 +312,12 @@ public interface BusConsumer {
         }
 
         @Override
-        public Iterable fetch() throws InterruptedException, IOException {
+        public Iterable fetch() throws IOException {
             final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
             if (response == null) {
                 logger.warn("{}: DMaaP NULL response received", this);
 
-                closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+                sleepAfterFetchFailure();
                 return new ArrayList<>();
             } else {
                 logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
@@ -322,7 +328,7 @@ public interface BusConsumer {
                     logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
                             response.getResponseMessage());
 
-                    closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+                    sleepAfterFetchFailure();
 
                     /* fall through */
                 }
@@ -335,6 +341,16 @@ public interface BusConsumer {
             }
         }
 
+        private void sleepAfterFetchFailure() {
+            try {
+                this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
+
+            } catch (InterruptedException e) {
+                logger.warn("{}: interrupted while handling fetch error", this, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+
         @Override
         public void close() {
             this.closeCondition.countDown();
@@ -434,7 +450,8 @@ public interface BusConsumer {
 
 
             final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
-                            ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+                            ? busTopicParams.getAdditionalProps().get(
+                                            PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
                             : null);
 
             if (busTopicParams.isEnvironmentInvalid()) {
@@ -474,7 +491,7 @@ public interface BusConsumer {
 
             props = new Properties();
 
-            props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+            props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
 
             props.setProperty("username", busTopicParams.getUserName());
             props.setProperty("password", busTopicParams.getPassword());
@@ -489,7 +506,7 @@ public interface BusConsumer {
                 props.setProperty("Partner", busTopicParams.getPartner());
             }
             if (dme2RouteOffer != null) {
-                props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+                props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
             }
 
             props.setProperty("Latitude", busTopicParams.getLatitude());
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 67adf3b4..469794c7 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -26,7 +26,6 @@ import com.att.nsa.cambria.client.CambriaBatchingPublisher;
 import com.att.nsa.cambria.client.CambriaClientBuilders;
 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-
 import java.net.MalformedURLException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -34,12 +33,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.lang3.StringUtils;
 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
 import org.slf4j.Logger;
@@ -299,7 +296,8 @@ public interface BusPublisher {
                     busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
 
             String dme2RouteOffer = busTopicParams.isAdditionalPropsValid()
-                            ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+                            ? busTopicParams.getAdditionalProps().get(
+                                            PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
                             : null;
 
             validateParams(busTopicParams, dme2RouteOffer);
@@ -310,13 +308,13 @@ public interface BusPublisher {
             props.setProperty("Environment", busTopicParams.getEnvironment());
             props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
 
-            props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+            props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
 
             if (busTopicParams.getPartner() != null) {
                 props.setProperty("Partner", busTopicParams.getPartner());
             }
             if (dme2RouteOffer != null) {
-                props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+                props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
             }
 
             props.setProperty("Latitude", busTopicParams.getLatitude());
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index 0953465b..164f2b16 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -29,6 +29,7 @@ import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 import org.onap.policy.common.utils.network.NetworkUtil;
@@ -103,13 +104,13 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         }
 
         if (busTopicParams.getFetchTimeout() <= 0) {
-            this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
+            this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
         } else {
             this.fetchTimeout = busTopicParams.getFetchTimeout();
         }
 
         if (busTopicParams.getFetchLimit() <= 0) {
-            this.fetchLimit = NO_LIMIT_FETCH;
+            this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
         } else {
             this.fetchLimit = busTopicParams.getFetchLimit();
         }
@@ -225,7 +226,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         while (this.alive) {
             try {
                 fetchAllMessages();
-            } catch (Exception e) {
+            } catch (IOException | RuntimeException e) {
                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
             }
         }
@@ -233,7 +234,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         logger.info("{}: exiting thread", this);
     }
 
-    private void fetchAllMessages() throws InterruptedException, IOException {
+    private void fetchAllMessages() throws IOException {
         for (String event : this.consumer.fetch()) {
             synchronized (this) {
                 this.recentEvents.add(event);
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
index f08a1381..9f8b3c06 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
@@ -24,7 +24,7 @@ package org.onap.policy.common.endpoints.event.comm.client;
 import java.util.List;
 import lombok.Getter;
 import lombok.NonNull;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.utils.coder.Coder;
 import org.onap.policy.common.utils.coder.CoderException;
@@ -109,6 +109,6 @@ public class TopicSinkClient {
      * @return the sinks for the topic
      */
     protected List getTopicSinks(final String topic) {
-        return TopicEndpoint.manager.getTopicSinks(topic);
+        return TopicEndpointManager.getManager().getTopicSinks(topic);
     }
 }
-- 
cgit 1.2.3-korg