diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal')
3 files changed, 39 insertions, 23 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index abf793d6..b66b4ba5 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -24,7 +24,6 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; - import java.io.IOException; import java.net.MalformedURLException; import java.security.GeneralSecurityException; @@ -38,7 +37,6 @@ import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,9 +50,9 @@ public interface BusConsumer { * fetch messages. * * @return list of messages - * @throws Exception when error encountered by underlying libraries + * @throws IOException when error encountered by underlying libraries */ - public Iterable<String> fetch() throws InterruptedException, IOException; + public Iterable<String> fetch() throws IOException; /** * close underlying library consumer. @@ -168,16 +166,24 @@ public interface BusConsumer { } @Override - public Iterable<String> fetch() throws IOException, InterruptedException { + public Iterable<String> fetch() throws IOException { try { return getCurrentConsumer().fetch(); } catch (final IOException e) { logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout, e); + this.fetchTimeout); + sleepAfterFetchFailure(); + throw e; + } + } + private void sleepAfterFetchFailure() { + try { this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); - throw e; + } catch (InterruptedException e) { + logger.warn("{}: interrupted while handling fetch error", this, e); + Thread.currentThread().interrupt(); } } @@ -306,12 +312,12 @@ public interface BusConsumer { } @Override - public Iterable<String> fetch() throws InterruptedException, IOException { + public Iterable<String> fetch() throws IOException { final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); if (response == null) { logger.warn("{}: DMaaP NULL response received", this); - closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS); + sleepAfterFetchFailure(); return new ArrayList<>(); } else { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), @@ -322,7 +328,7 @@ public interface BusConsumer { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), response.getResponseMessage()); - closeCondition.await(fetchTimeout, TimeUnit.MILLISECONDS); + sleepAfterFetchFailure(); /* fall through */ } @@ -335,6 +341,16 @@ public interface BusConsumer { } } + private void sleepAfterFetchFailure() { + try { + this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); + + } catch (InterruptedException e) { + logger.warn("{}: interrupted while handling fetch error", this, e); + Thread.currentThread().interrupt(); + } + } + @Override public void close() { this.closeCondition.countDown(); @@ -434,7 +450,8 @@ public interface BusConsumer { final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + ? busTopicParams.getAdditionalProps().get( + PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) : null); if (busTopicParams.isEnvironmentInvalid()) { @@ -474,7 +491,7 @@ public interface BusConsumer { props = new Properties(); - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); props.setProperty("username", busTopicParams.getUserName()); props.setProperty("password", busTopicParams.getPassword()); @@ -489,7 +506,7 @@ public interface BusConsumer { props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } props.setProperty("Latitude", busTopicParams.getLatitude()); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java index 67adf3b4..469794c7 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java @@ -26,7 +26,6 @@ import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.fasterxml.jackson.annotation.JsonIgnore; - import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -34,12 +33,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; - import org.apache.commons.lang3.StringUtils; import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher; import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; -import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.slf4j.Logger; @@ -299,7 +296,8 @@ public interface BusPublisher { busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps()); String dme2RouteOffer = busTopicParams.isAdditionalPropsValid() - ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY) + ? busTopicParams.getAdditionalProps().get( + PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY) : null; validateParams(busTopicParams, dme2RouteOffer); @@ -310,13 +308,13 @@ public interface BusPublisher { props.setProperty("Environment", busTopicParams.getEnvironment()); props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment()); - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + props.setProperty(PolicyEndPointProperties.DME2_SERVICE_NAME_PROPERTY, serviceName); if (busTopicParams.getPartner() != null) { props.setProperty("Partner", busTopicParams.getPartner()); } if (dme2RouteOffer != null) { - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + props.setProperty(PolicyEndPointProperties.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); } props.setProperty("Latitude", busTopicParams.getLatitude()); diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index 0953465b..164f2b16 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -29,6 +29,7 @@ import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; import org.onap.policy.common.utils.network.NetworkUtil; @@ -103,13 +104,13 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase } if (busTopicParams.getFetchTimeout() <= 0) { - this.fetchTimeout = NO_TIMEOUT_MS_FETCH; + this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH; } else { this.fetchTimeout = busTopicParams.getFetchTimeout(); } if (busTopicParams.getFetchLimit() <= 0) { - this.fetchLimit = NO_LIMIT_FETCH; + this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH; } else { this.fetchLimit = busTopicParams.getFetchLimit(); } @@ -225,7 +226,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase while (this.alive) { try { fetchAllMessages(); - } catch (Exception e) { + } catch (IOException | RuntimeException e) { logger.error("{}: cannot fetch because of ", this, e.getMessage(), e); } } @@ -233,7 +234,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase logger.info("{}: exiting thread", this); } - private void fetchAllMessages() throws InterruptedException, IOException { + private void fetchAllMessages() throws IOException { for (String event : this.consumer.fetch()) { synchronized (this) { this.recentEvents.add(event); |