diff options
author | Jim Hahn <jrh3@att.com> | 2020-10-29 16:07:19 -0400 |
---|---|---|
committer | Jim Hahn <jrh3@att.com> | 2020-10-29 16:08:23 -0400 |
commit | 7b1f4e95ccd02ec4b49c8e7da558f2b6f20d48e1 (patch) | |
tree | c00b049a50f56fa0d471ddb97c3e0cf33bd52b3f /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | |
parent | a9c3b525fb3f37e56ccf143f35f739fb60024b0e (diff) |
Remove server-side filtering from policy-endpoints
ONAP DMaaP Message Router no longer supports server-side filtering.
Removed it from policy-endpoints.
Issue-ID: POLICY-2881
Change-Id: I08157f7699608af63992dec78a61c5f9c55037b9
Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java | 80 |
1 files changed, 4 insertions, 76 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 233434f1..60ab2e9e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -61,23 +61,9 @@ public interface BusConsumer { public void close(); /** - * BusConsumer that supports server-side filtering. - */ - public interface FilterableBusConsumer extends BusConsumer { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - - /** * Cambria based consumer. */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { + public static class CambriaConsumerWrapper implements BusConsumer { /** * logger. @@ -90,19 +76,9 @@ public interface BusConsumer { private final ConsumerBuilder builder; /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** * Cambria client. */ - private CambriaConsumer consumer; - - /** - * Cambria client to use for next fetch. - */ - private CambriaConsumer newConsumer = null; + private final CambriaConsumer consumer; /** * fetch timeout. @@ -169,7 +145,7 @@ public interface BusConsumer { @Override public Iterable<String> fetch() throws IOException { try { - return getCurrentConsumer().fetch(); + return this.consumer.fetch(); } catch (final IOException e) { //NOSONAR logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), this.fetchTimeout); @@ -191,55 +167,7 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } + this.consumer.close(); } @Override |