aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2020-10-29 16:07:19 -0400
committerJim Hahn <jrh3@att.com>2020-10-29 16:08:23 -0400
commit7b1f4e95ccd02ec4b49c8e7da558f2b6f20d48e1 (patch)
treec00b049a50f56fa0d471ddb97c3e0cf33bd52b3f /policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
parenta9c3b525fb3f37e56ccf143f35f739fb60024b0e (diff)
Remove server-side filtering from policy-endpoints
ONAP DMaaP Message Router no longer supports server-side filtering. Removed it from policy-endpoints. Issue-ID: POLICY-2881 Change-Id: I08157f7699608af63992dec78a61c5f9c55037b9 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java80
1 files changed, 4 insertions, 76 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 233434f1..60ab2e9e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -61,23 +61,9 @@ public interface BusConsumer {
public void close();
/**
- * BusConsumer that supports server-side filtering.
- */
- public interface FilterableBusConsumer extends BusConsumer {
-
- /**
- * Sets the server-side filter.
- *
- * @param filter new filter value, or {@code null}
- * @throws IllegalArgumentException if the consumer cannot be built with the new filter
- */
- public void setFilter(String filter);
- }
-
- /**
* Cambria based consumer.
*/
- public static class CambriaConsumerWrapper implements FilterableBusConsumer {
+ public static class CambriaConsumerWrapper implements BusConsumer {
/**
* logger.
@@ -90,19 +76,9 @@ public interface BusConsumer {
private final ConsumerBuilder builder;
/**
- * Locked while updating {@link #consumer} and {@link #newConsumer}.
- */
- private final Object consLocker = new Object();
-
- /**
* Cambria client.
*/
- private CambriaConsumer consumer;
-
- /**
- * Cambria client to use for next fetch.
- */
- private CambriaConsumer newConsumer = null;
+ private final CambriaConsumer consumer;
/**
* fetch timeout.
@@ -169,7 +145,7 @@ public interface BusConsumer {
@Override
public Iterable<String> fetch() throws IOException {
try {
- return getCurrentConsumer().fetch();
+ return this.consumer.fetch();
} catch (final IOException e) { //NOSONAR
logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
this.fetchTimeout);
@@ -191,55 +167,7 @@ public interface BusConsumer {
@Override
public void close() {
this.closeCondition.countDown();
- getCurrentConsumer().close();
- }
-
- private CambriaConsumer getCurrentConsumer() {
- CambriaConsumer old = null;
- CambriaConsumer ret;
-
- synchronized (consLocker) {
- if (this.newConsumer != null) {
- // replace old consumer with new consumer
- old = this.consumer;
- this.consumer = this.newConsumer;
- this.newConsumer = null;
- }
-
- ret = this.consumer;
- }
-
- if (old != null) {
- old.close();
- }
-
- return ret;
- }
-
- @Override
- public void setFilter(String filter) {
- logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
- builder.withServerSideFilter(filter);
-
- try {
- CambriaConsumer previous;
- synchronized (consLocker) {
- previous = this.newConsumer;
- this.newConsumer = builder.build();
- }
-
- if (previous != null) {
- // there was already a new consumer - close it
- previous.close();
- }
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- /*
- * Since an exception occurred, "consumer" still has its old value, thus it should
- * not be closed at this point.
- */
- throw new IllegalArgumentException(e);
- }
+ this.consumer.close();
}
@Override