aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java43
1 files changed, 30 insertions, 13 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index abf793d6..b66b4ba5 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -24,7 +24,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal;
import com.att.nsa.cambria.client.CambriaClientBuilders;
import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
-
import java.io.IOException;
import java.net.MalformedURLException;
import java.security.GeneralSecurityException;
@@ -38,7 +37,6 @@ import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
-import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,9 +50,9 @@ public interface BusConsumer {
* fetch messages.
*
* @return list of messages
- * @throws Exception when error encountered by underlying libraries
+ * @throws IOException when error encountered by underlying libraries
*/
- public Iterable<String> fetch() throws InterruptedException, IOException;
+ public Iterable<String> fetch() throws IOException;
/**
* close underlying library consumer.
@@ -168,16 +166,24 @@ public interface BusConsumer {
}
@Override
- public Iterable<String> fetch() throws IOException, InterruptedException {
+ public Iterable<String> fetch() throws IOException {
try {
return getCurrentConsumer().fetch();
} catch (final IOException e) {
logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout, e);
+ this.fetchTimeout);
+ sleepAfterFetchFailure();
+ throw e;
+ }
+ }
+ private void sleepAfterFetchFailure() {
+ try {
this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
- throw e;
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
}
}
@@ -306,12 +312,12 @@ public interface BusConsumer {
}
@Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
+ public Iterable<String> fetch() throws IOException {
final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
if (response == null) {
logger.warn("{}: DMaaP NULL response received", this);
- closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+ sleepAfterFetchFailure();
return new ArrayList<>();
} else {
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(),
@@ -322,7 +328,7 @@ public interface BusConsumer {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
response.getResponseMessage());
- closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS);
+ sleepAfterFetchFailure();
/* fall through */
}
@@ -335,6 +341,16 @@ public interface BusConsumer {
}
}
+ private void sleepAfterFetchFailure() {
+ try {
+ this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted while handling fetch error", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
@Override
public void close() {
this.closeCondition.countDown();
@@ -434,7 +450,8 @@ public interface BusConsumer {
final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
- ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ ? busTopicParams.getAdditionalProps().get(
+ PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY)
: null);
if (busTopicParams.isEnvironmentInvalid()) {
@@ -474,7 +491,7 @@ public interface BusConsumer {
props = new Properties();
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+ props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName);
props.setProperty("username", busTopicParams.getUserName());
props.setProperty("password", busTopicParams.getPassword());
@@ -489,7 +506,7 @@ public interface BusConsumer {
props.setProperty("Partner", busTopicParams.getPartner());
}
if (dme2RouteOffer != null) {
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+ props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
}
props.setProperty("Latitude", busTopicParams.getLatitude());