From 965d274773192f8f44704d73146efc5bb1a1c395 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Wed, 18 Apr 2018 11:12:54 -0400 Subject: Close old UEB/DMaaP consumer Modified code to close the old consumer when the filter is changed. Made some changes to toString() methods to resolve some sonar issues. Modified so-as to not interfere with fetch(). Use synchronized instead of AtomicReferences. Change-Id: I4c9d2cc32993208fd345e66ef1f1dce7a1e7de7d Issue-ID: POLICY-750 Signed-off-by: Jim Hahn --- .../event/comm/bus/internal/BusConsumer.java | 80 ++++++++++++++++------ 1 file changed, 58 insertions(+), 22 deletions(-) diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index 70c37d55..a299060d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; - import org.onap.policy.drools.event.comm.bus.DmaapTopicSinkFactory; import org.onap.policy.drools.properties.PolicyProperties; import org.slf4j.Logger; @@ -89,11 +88,21 @@ public interface BusConsumer { * Used to build the consumer. */ private final ConsumerBuilder builder; + + /** + * Locked while updating {@link #consumer} and {@link #newConsumer}. + */ + private final Object consLocker = new Object(); /** * Cambria client */ - protected volatile CambriaConsumer consumer; + private CambriaConsumer consumer; + + /** + * Cambria client to use for next fetch + */ + private CambriaConsumer newConsumer = null; /** * fetch timeout @@ -168,7 +177,7 @@ public interface BusConsumer { @Override public Iterable fetch() throws IOException, InterruptedException { try { - return this.consumer.fetch(); + return getCurrentConsumer().fetch(); } catch (final IOException e) { logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), this.fetchTimeout); @@ -186,7 +195,29 @@ public interface BusConsumer { closeCondition.notifyAll(); } - this.consumer.close(); + getCurrentConsumer().close(); + } + + private CambriaConsumer getCurrentConsumer() { + CambriaConsumer old = null; + CambriaConsumer ret; + + synchronized(consLocker) { + if(this.newConsumer != null) { + // replace old consumer with new consumer + old = this.consumer; + this.consumer = this.newConsumer; + this.newConsumer = null; + } + + ret = this.consumer; + } + + if(old != null) { + old.close(); + } + + return ret; } @Override @@ -195,18 +226,29 @@ public interface BusConsumer { builder.withServerSideFilter(filter); try { - consumer = builder.build(); + CambriaConsumer previous; + synchronized(consLocker) { + previous = this.newConsumer; + this.newConsumer = builder.build(); + } + + if(previous != null) { + // there was already a new consumer - close it + previous.close(); + } } catch (MalformedURLException | GeneralSecurityException e) { + /* + * Since an exception occurred, "consumer" still has its old value, + * thus it should not be closed at this point. + */ throw new IllegalArgumentException(e); } } @Override public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]"); - return builder.toString(); + return "CambriaConsumerWrapper [fetchTimeout=" + fetchTimeout + "]"; } } @@ -311,13 +353,10 @@ public interface BusConsumer { @Override public String toString() { - final StringBuilder builder = new StringBuilder(); - builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") - .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") - .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) - .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) - .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); - return builder.toString(); + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + + ", consumer.getUsername()=" + consumer.getUsername() + "]"; } } @@ -378,15 +417,12 @@ public interface BusConsumer { @Override public String toString() { - final StringBuilder builder = new StringBuilder(); final MRConsumerImpl consumer = this.consumer; - builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") - .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") - .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) - .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) - .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); - return builder.toString(); + return "DmaapConsumerWrapper [" + "consumer.getAuthDate()=" + consumer.getAuthDate() + + ", consumer.getAuthKey()=" + consumer.getAuthKey() + ", consumer.getHost()=" + + consumer.getHost() + ", consumer.getProtocolFlag()=" + consumer.getProtocolFlag() + + ", consumer.getUsername()=" + consumer.getUsername() + "]"; } } -- cgit 1.2.3-korg