diff options
2 files changed, 52 insertions, 10 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 1e2c82b1..20f4c91c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -73,9 +74,15 @@ public interface BusConsumer { protected int fetchTimeout; /** + * Time to sleep on a fetch failure. + */ + @Getter + private final int sleepTime; + + /** * Counted down when {@link #close()} is invoked. */ - private CountDownLatch closeCondition = new CountDownLatch(1); + private final CountDownLatch closeCondition = new CountDownLatch(1); /** @@ -85,6 +92,13 @@ public interface BusConsumer { */ protected FetchingBusConsumer(BusTopicParams busTopicParams) { this.fetchTimeout = busTopicParams.getFetchTimeout(); + + if (this.fetchTimeout <= 0) { + this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + } } /** @@ -93,7 +107,8 @@ public interface BusConsumer { */ protected void sleepAfterFetchFailure() { try { - if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) { + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { logger.info("{}: closed while handling fetch error", this); } @@ -185,8 +200,7 @@ public interface BusConsumer { try { return this.consumer.fetch(); } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); + logger.error("{}: cannot fetch because of {}", this, e.getMessage()); sleepAfterFetchFailure(); throw e; } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index aba05887..21050f97 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -46,6 +46,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Dmaa import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; +import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.powermock.reflect.Whitebox; public class BusConsumerTest extends TopicTestBase { @@ -60,9 +61,29 @@ public class BusConsumerTest extends TopicTestBase { } @Test + public void testFetchingBusConsumer() throws InterruptedException { + // should not be negative + var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be zero + cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be too large + cons = new FetchingBusConsumerImpl( + makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build()); + assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + + // should not be what was specified + cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build()); + assertThat(cons.getSleepTime()).isEqualTo(100); + } + + @Test public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException { - var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) { + var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) { private CountDownLatch started = new CountDownLatch(1); @@ -71,11 +92,6 @@ public class BusConsumerTest extends TopicTestBase { started.countDown(); super.sleepAfterFetchFailure(); } - - @Override - public Iterable<String> fetch() throws IOException { - return null; - } }; // full sleep @@ -278,4 +294,16 @@ public class BusConsumerTest extends TopicTestBase { public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception { new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()); } + + private static class FetchingBusConsumerImpl extends FetchingBusConsumer { + + protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) { + super(busTopicParams); + } + + @Override + public Iterable<String> fetch() throws IOException { + return null; + } + } } |