diff options
2 files changed, 109 insertions, 50 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 5fb3aedb..1e2c82b1 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd. * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. * ================================================================================ @@ -62,9 +62,57 @@ public interface BusConsumer { public void close(); /** + * Consumer that handles fetch() failures by sleeping. + */ + public abstract static class FetchingBusConsumer implements BusConsumer { + private static Logger logger = LoggerFactory.getLogger(FetchingBusConsumer.class); + + /** + * Fetch timeout. + */ + protected int fetchTimeout; + + /** + * Counted down when {@link #close()} is invoked. + */ + private CountDownLatch closeCondition = new CountDownLatch(1); + + + /** + * Constructs the object. + * + * @param busTopicParams parameters for the bus topic + */ + protected FetchingBusConsumer(BusTopicParams busTopicParams) { + this.fetchTimeout = busTopicParams.getFetchTimeout(); + } + + /** + * Causes the thread to sleep; invoked after fetch() fails. If the consumer is closed, + * or the thread is interrupted, then this will return immediately. + */ + protected void sleepAfterFetchFailure() { + try { + if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) { + logger.info("{}: closed while handling fetch error", this); + } + + } catch (InterruptedException e) { + logger.warn("{}: interrupted while handling fetch error", this, e); + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() { + this.closeCondition.countDown(); + } + } + + /** * Cambria based consumer. */ - public static class CambriaConsumerWrapper implements BusConsumer { + public static class CambriaConsumerWrapper extends FetchingBusConsumer { /** * logger. @@ -82,16 +130,6 @@ public interface BusConsumer { private final CambriaConsumer consumer; /** - * fetch timeout. - */ - protected int fetchTimeout; - - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); - - /** * Cambria Consumer Wrapper. * BusTopicParam object contains the following parameters * servers messaging bus hosts. @@ -108,8 +146,7 @@ public interface BusConsumer { * @throws MalformedURLException - Malformed URL exception */ public CambriaConsumerWrapper(BusTopicParams busTopicParams) { - - this.fetchTimeout = busTopicParams.getFetchTimeout(); + super(busTopicParams); this.builder = new CambriaClientBuilders.ConsumerBuilder(); @@ -155,19 +192,9 @@ public interface BusConsumer { } } - private void sleepAfterFetchFailure() { - try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR - - } catch (InterruptedException e) { - logger.warn("{}: interrupted while handling fetch error", this, e); - Thread.currentThread().interrupt(); - } - } - @Override public void close() { - this.closeCondition.countDown(); + super.close(); this.consumer.close(); } @@ -180,7 +207,7 @@ public interface BusConsumer { /** * MR based consumer. */ - public abstract class DmaapConsumerWrapper implements BusConsumer { + public abstract class DmaapConsumerWrapper extends FetchingBusConsumer { /** * logger. @@ -193,16 +220,6 @@ public interface BusConsumer { protected static final String PROTOCOL_PROP = "Protocol"; /** - * fetch timeout. - */ - protected int fetchTimeout; - - /** - * close condition. - */ - protected CountDownLatch closeCondition = new CountDownLatch(1); - - /** * MR Consumer. */ protected MRConsumerImpl consumer; @@ -225,8 +242,7 @@ public interface BusConsumer { * @throws MalformedURLException URL should be valid */ protected DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException { - - this.fetchTimeout = busTopicParams.getFetchTimeout(); + super(busTopicParams); if (busTopicParams.isTopicInvalid()) { throw new IllegalArgumentException("No topic for DMaaP"); @@ -277,19 +293,9 @@ public interface BusConsumer { } } - private void sleepAfterFetchFailure() { - try { - this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS); //NOSONAR - - } catch (InterruptedException e) { - logger.warn("{}: interrupted while handling fetch error", this, e); - Thread.currentThread().interrupt(); - } - } - @Override public void close() { - this.closeCondition.countDown(); + super.close(); this.consumer.close(); } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 82d5eef8..aba05887 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -33,6 +34,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import org.apache.commons.collections4.IteratorUtils; import org.junit.Before; import org.junit.Test; @@ -43,10 +45,14 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.Camb import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper; import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer; import org.powermock.reflect.Whitebox; public class BusConsumerTest extends TopicTestBase { + private static final int SHORT_TIMEOUT_MILLIS = 10; + private static final int LONG_TIMEOUT_MILLIS = 3000; + @Before @Override public void setUp() { @@ -54,6 +60,53 @@ public class BusConsumerTest extends TopicTestBase { } @Test + public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException { + + var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) { + + private CountDownLatch started = new CountDownLatch(1); + + @Override + protected void sleepAfterFetchFailure() { + started.countDown(); + super.sleepAfterFetchFailure(); + } + + @Override + public Iterable<String> fetch() throws IOException { + return null; + } + }; + + // full sleep + long tstart = System.currentTimeMillis(); + cons.sleepAfterFetchFailure(); + assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS); + + // close while sleeping - sleep should halt prematurely + cons.fetchTimeout = LONG_TIMEOUT_MILLIS; + cons.started = new CountDownLatch(1); + Thread thread = new Thread(cons::sleepAfterFetchFailure); + tstart = System.currentTimeMillis(); + thread.start(); + cons.started.await(); + cons.close(); + thread.join(); + assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS); + + // interrupt while sleeping - sleep should halt prematurely + cons.fetchTimeout = LONG_TIMEOUT_MILLIS; + cons.started = new CountDownLatch(1); + thread = new Thread(cons::sleepAfterFetchFailure); + tstart = System.currentTimeMillis(); + thread.start(); + cons.started.await(); + thread.interrupt(); + thread.join(); + assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS); + } + + @Test public void testCambriaConsumerWrapper() { // verify that different wrappers can be built new CambriaConsumerWrapper(makeBuilder().build()); |