summaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2018-04-18 11:12:54 -0400
committerJorge Hernandez <jh1730@att.com>2018-04-19 19:04:43 +0000
commit965d274773192f8f44704d73146efc5bb1a1c395 (patch)
tree2731a71d6fe47ee8cdd96962897284fd1b36c485 /policy-endpoints/src
parentdbdef1e3e3803b5848a1fad746c652f9359d4313 (diff)
Close old UEB/DMaaP consumer
Modified code to close the old consumer when the filter is changed. Made some changes to toString() methods to resolve some sonar issues. Modified so-as to not interfere with fetch(). Use synchronized instead of AtomicReferences. Change-Id: I4c9d2cc32993208fd345e66ef1f1dce7a1e7de7d Issue-ID: POLICY-750 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java80
1 files changed, 58 insertions, 22 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index 70c37d55..a299060d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-
import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
import org.onap.policy.drools.properties.PolicyProperties;
import org.slf4j.Logger;
@@ -89,11 +88,21 @@ public interface BusConsumer {
* Used to build the consumer.
*/
private final ConsumerBuilder builder;
+
+ /**
+ * Locked while updating {@link #consumer} and {@link #newConsumer}.
+ */
+ private final Object consLocker = new Object();
/**
* Cambria client
*/
- protected volatile CambriaConsumer consumer;
+ private CambriaConsumer consumer;
+
+ /**
+ * Cambria client to use for next fetch
+ */
+ private CambriaConsumer newConsumer = null;
/**
* fetch timeout
@@ -168,7 +177,7 @@ public interface BusConsumer {
@Override
public Iterable<String> fetch() throws IOException, InterruptedException {
try {
- return this.consumer.fetch();
+ return getCurrentConsumer().fetch();
} catch (final IOException e) {
logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
this.fetchTimeout);
@@ -186,7 +195,29 @@ public interface BusConsumer {
closeCondition.notifyAll();
}
- this.consumer.close();
+ getCurrentConsumer().close();
+ }
+
+ private CambriaConsumer getCurrentConsumer() {
+ CambriaConsumer old = null;
+ CambriaConsumer ret;
+
+ synchronized(consLocker) {
+ if(this.newConsumer != null) {
+ // replace old consumer with new consumer
+ old = this.consumer;
+ this.consumer = this.newConsumer;
+ this.newConsumer = null;
+ }
+
+ ret = this.consumer;
+ }
+
+ if(old != null) {
+ old.close();
+ }
+
+ return ret;
}
@Override
@@ -195,18 +226,29 @@ public interface BusConsumer {
builder.withServerSideFilter(filter);
try {
- consumer = builder.build();
+ CambriaConsumer previous;
+ synchronized(consLocker) {
+ previous = this.newConsumer;
+ this.newConsumer = builder.build();
+ }
+
+ if(previous != null) {
+ // there was already a new consumer - close it
+ previous.close();
+ }
} catch (MalformedURLException | GeneralSecurityException e) {
+ /*
+ * Since an exception occurred, "consumer" still has its old value,
+ * thus it should not be closed at this point.
+ */
throw new IllegalArgumentException(e);
}
}
@Override
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
- return builder.toString();
+ return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]";
}
}
@@ -311,13 +353,10 @@ public interface BusConsumer {
@Override
public String toString() {
- final StringBuilder builder = new StringBuilder();
- builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
- .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
- .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
- .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
- .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
- return builder.toString();
+ return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
+ + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
+ + ", consumer.getUsername()=" + consumer.getUsername() + "]";
}
}
@@ -378,15 +417,12 @@ public interface BusConsumer {
@Override
public String toString() {
- final StringBuilder builder = new StringBuilder();
final MRConsumerImpl consumer = this.consumer;
- builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
- .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
- .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
- .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
- .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
- return builder.toString();
+ return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate()
+ + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()="
+ + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag()
+ + ", consumer.getUsername()=" + consumer.getUsername() + "]";
}
}