aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2021-05-21 17:31:02 -0400
committerJim Hahn <jrh3@att.com>2021-05-21 17:43:43 -0400
commit57410a85a607f71d4094dae9d9e57dabb7b0b33a (patch)
tree5a8195291c2bd3418b0148c4dc4f59033d4947e3 /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
parentc7d24b07377cb00ce4c0c531c6d3dff25e04dc12 (diff)
Fix sonar duplicate code issue
Issue-ID: POLICY-3284 Change-Id: I78c3a8ac92e18e2b0088eb07e27a4e97866d6182 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java104
1 files changed, 55 insertions, 49 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 5fb3aedb..1e2c82b1 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
* Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
* ================================================================================
@@ -62,9 +62,57 @@ public interface BusConsumer {
public void close();
/**
+ * Consumer that handles fetch() failures by sleeping.
+ */
+ public abstract static class FetchingBusConsumer implements BusConsumer {
+ private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class);
+
+ /**
+ * Fetch timeout.
+ */
+ protected int fetchTimeout;
+
+ /**
+ * Counted down when {@link #close()} is invoked.
+ */
+ private CountDownLatch closeCondition = new CountDownLatch(1);
+
+
+ /**
+ * Constructs the object.
+ *
+ * @param busTopicParams parameters for the bus topic
+ */
+ protected FetchingBusConsumer(BusTopicParams busTopicParams) {
+ this.fetchTimeout = busTopicParams.getFetchTimeout();
+ }
+
+ /**
+ * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed,
+ * or the thread is interrupted, then this will return immediately.
+ */
+ protected void sleepAfterFetchFailure() {
+ try {
+ if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: closed while handling fetch error", this);
+ }
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void close() {
+ this.closeCondition.countDown();
+ }
+ }
+
+ /**
* Cambria based consumer.
*/
- public static class CambriaConsumerWrapper implements BusConsumer {
+ public static class CambriaConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
@@ -82,16 +130,6 @@ public interface BusConsumer {
private final CambriaConsumer consumer;
/**
- * fetch timeout.
- */
- protected int fetchTimeout;
-
- /**
- * close condition.
- */
- protected CountDownLatch closeCondition = new CountDownLatch(1);
-
- /**
* Cambria Consumer Wrapper.
* BusTopicParam object contains the following parameters
* servers messaging bus hosts.
@@ -108,8 +146,7 @@ public interface BusConsumer {
* @throws MalformedURLException - Malformed URL exception
*/
public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
-
- this.fetchTimeout = busTopicParams.getFetchTimeout();
+ super(busTopicParams);
this.builder = new CambriaClientBuilders.ConsumerBuilder();
@@ -155,19 +192,9 @@ public interface BusConsumer {
}
}
- private void sleepAfterFetchFailure() {
- try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
- } catch (InterruptedException e) {
- logger.warn("{}: interrupted while handling fetch error", this, e);
- Thread.currentThread().interrupt();
- }
- }
-
@Override
public void close() {
- this.closeCondition.countDown();
+ super.close();
this.consumer.close();
}
@@ -180,7 +207,7 @@ public interface BusConsumer {
/**
* MR based consumer.
*/
- public abstract class DmaapConsumerWrapper implements BusConsumer {
+ public abstract class DmaapConsumerWrapper extends FetchingBusConsumer {
/**
* logger.
@@ -193,16 +220,6 @@ public interface BusConsumer {
protected static final String PROTOCOL_PROP = "Protocol";
/**
- * fetch timeout.
- */
- protected int fetchTimeout;
-
- /**
- * close condition.
- */
- protected CountDownLatch closeCondition = new CountDownLatch(1);
-
- /**
* MR Consumer.
*/
protected MRConsumerImpl consumer;
@@ -225,8 +242,7 @@ public interface BusConsumer {
* @throws MalformedURLException URL should be valid
*/
protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
-
- this.fetchTimeout = busTopicParams.getFetchTimeout();
+ super(busTopicParams);
if (busTopicParams.isTopicInvalid()) {
throw new IllegalArgumentException("No topic for DMaaP");
@@ -277,19 +293,9 @@ public interface BusConsumer {
}
}
- private void sleepAfterFetchFailure() {
- try {
- this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR
-
- } catch (InterruptedException e) {
- logger.warn("{}: interrupted while handling fetch error", this, e);
- Thread.currentThread().interrupt();
- }
- }
-
@Override
public void close() {
- this.closeCondition.countDown();
+ super.close();
this.consumer.close();
}