From 7b1f4e95ccd02ec4b49c8e7da558f2b6f20d48e1 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Thu, 29 Oct 2020 16:07:19 -0400 Subject: Remove server-side filtering from policy-endpoints ONAP DMaaP Message Router no longer supports server-side filtering. Removed it from policy-endpoints. Issue-ID: POLICY-2881 Change-Id: I08157f7699608af63992dec78a61c5f9c55037b9 Signed-off-by: Jim Hahn --- .../event/comm/bus/internal/BusConsumer.java | 80 ++-------------------- 1 file changed, 4 insertions(+), 76 deletions(-) (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java') diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 233434f1..60ab2e9e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -60,24 +60,10 @@ public interface BusConsumer { */ public void close(); - /** - * BusConsumer that supports server-side filtering. - */ - public interface FilterableBusConsumer extends BusConsumer { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - /** * Cambria based consumer. */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { + public static class CambriaConsumerWrapper implements BusConsumer { /** * logger. @@ -89,20 +75,10 @@ public interface BusConsumer { */ private final ConsumerBuilder builder; - /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - /** * Cambria client. */ - private CambriaConsumer consumer; - - /** - * Cambria client to use for next fetch. - */ - private CambriaConsumer newConsumer = null; + private final CambriaConsumer consumer; /** * fetch timeout. @@ -169,7 +145,7 @@ public interface BusConsumer { @Override public Iterable fetch() throws IOException { try { - return getCurrentConsumer().fetch(); + return this.consumer.fetch(); } catch (final IOException e) { //NOSONAR logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), this.fetchTimeout); @@ -191,55 +167,7 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } + this.consumer.close(); } @Override -- cgit 1.2.3-korg