aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2021-07-01 11:41:00 -0400
committerJim Hahn <jrh3@att.com>2021-07-02 08:53:40 -0400
commit67820c17aba5a212ba00ef33c36de2540cdb303e (patch)
tree0b0fafa0fc2daa8bee8c20c7e1a27b64ee7424c3 /policy-endpoints/src/main
parent749ee9da67d32ca8e33169607fcd7139632b29e7 (diff)
Wait after fetch exception on topic
When dmaap is inaccessible for some reason, the topic source frequently enters a fast fail loop, rapidly filling up the log. Modified the code to wait the configured fetchTimeout when this occurs. With any luck, this will also fix the sporadic kubernetes crash-fail loops sometimes seen with the xacml-pdp pod. Modified to limit how long it will sleep after a failure, regardless of the fetchTimeout that was specified. Issue-ID: POLICY-3457 Change-Id: I88e360fb1d31197b46f4959e5ea0ea2d741ad25c Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java22
1 files changed, 18 insertions, 4 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 1e2c82b1..20f4c91c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
@@ -73,9 +74,15 @@ public interface BusConsumer {
protected int fetchTimeout;
/**
+ * Time to sleep on a fetch failure.
+ */
+ @Getter
+ private final int sleepTime;
+
+ /**
* Counted down when {@link #close()} is invoked.
*/
- private CountDownLatch closeCondition = new CountDownLatch(1);
+ private final CountDownLatch closeCondition = new CountDownLatch(1);
/**
@@ -85,6 +92,13 @@ public interface BusConsumer {
*/
protected FetchingBusConsumer(BusTopicParams busTopicParams) {
this.fetchTimeout = busTopicParams.getFetchTimeout();
+
+ if (this.fetchTimeout <= 0) {
+ this.sleepTime = PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH;
+ } else {
+ // don't sleep too long, even if fetch timeout is large
+ this.sleepTime = Math.min(this.fetchTimeout, PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
+ }
}
/**
@@ -93,7 +107,8 @@ public interface BusConsumer {
*/
protected void sleepAfterFetchFailure() {
try {
- if (this.closeCondition.await(this.fetchTimeout, TimeUnit.MILLISECONDS)) {
+ logger.info("{}: backoff for {}ms", this, sleepTime);
+ if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
logger.info("{}: closed while handling fetch error", this);
}
@@ -185,8 +200,7 @@ public interface BusConsumer {
try {
return this.consumer.fetch();
} catch (final IOException e) { //NOSONAR
- logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
- this.fetchTimeout);
+ logger.error("{}: cannot fetch because of {}", this, e.getMessage());
sleepAfterFetchFailure();
throw e;
}