diff options
author | Jim Hahn <jrh3@att.com> | 2021-07-01 11:41:00 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2021-07-02 08:53:40 -0400 |
commit | 67820c17aba5a212ba00ef33c36de2540cdb303e (patch) | |
tree | 0b0fafa0fc2daa8bee8c20c7e1a27b64ee7424c3 /policy-endpoints/src/main/java | |
parent | 749ee9da67d32ca8e33169607fcd7139632b29e7 (diff) |
Wait after fetch exception on topic
When dmaap is inaccessible for some reason, the topic source frequently
enters a fast fail loop, rapidly filling up the log. Modified the code
to wait the configured fetchTimeout when this occurs.
With any luck, this will also fix the sporadic kubernetes crash-fail
loops sometimes seen with the xacml-pdp pod.
Modified to limit how long it will sleep after a failure, regardless of
the fetchTimeout that was specified.
Issue-ID: POLICY-3457
Change-Id: I88e360fb1d31197b46f4959e5ea0ea2d741ad25c
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 1e2c82b1..20f4c91c 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.impl.MRConsumerImpl; @@ -73,9 +74,15 @@ public interface BusConsumer { protected int fetchTimeout; /** + * Time to sleep on a fetch failure. + */ + @Getter + private final int sleepTime; + + /** * Counted down when {@link #close()} is invoked. */ - private CountDownLatch closeCondition = new CountDownLatch(1); + private final CountDownLatch closeCondition = new CountDownLatch(1); /** @@ -85,6 +92,13 @@ public interface BusConsumer { */ protected FetchingBusConsumer(BusTopicParams busTopicParams) { this.fetchTimeout = busTopicParams.getFetchTimeout(); + + if (this.fetchTimeout <= 0) { + this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH; + } else { + // don't sleep too long, even if fetch timeout is large + this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH); + } } /** @@ -93,7 +107,8 @@ public interface BusConsumer { */ protected void sleepAfterFetchFailure() { try { - if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) { + logger.info("{}: backoff for {}ms", this, sleepTime); + if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) { logger.info("{}: closed while handling fetch error", this); } @@ -185,8 +200,7 @@ public interface BusConsumer { try { return this.consumer.fetch(); } catch (final IOException e) { //NOSONAR - logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), - this.fetchTimeout); + logger.error("{}: cannot fetch because of {}", this, e.getMessage()); sleepAfterFetchFailure(); throw e; } |