aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java39
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java39
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java17
3 files changed, 89 insertions, 6 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
new file mode 100644
index 00000000..b1e0e1c2
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.event.comm;
+
+/**
+ * TopicSource that supports server-side filtering.
+ */
+public interface FilterableTopicSource extends TopicSource {
+
+ /**
+ * Sets the server-side filter.
+ *
+ * @param filter new filter value, or {@code null}
+ * @throws UnsupportedOperationException if the consumer does not support
+ * server-side filtering
+ * @throws IllegalArgumentException if the consumer cannot be built with the
+ * new filter
+ */
+ public void setFilter(String filter);
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index 984baa79..db240b3d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -61,19 +61,39 @@ public interface BusConsumer {
public void close();
/**
+ * BusConsumer that supports server-side filtering.
+ */
+ public interface FilterableBusConsumer extends BusConsumer {
+
+ /**
+ * Sets the server-side filter.
+ *
+ * @param filter new filter value, or {@code null}
+ * @throws IllegalArgumentException if the consumer cannot be built with
+ * the new filter
+ */
+ public void setFilter(String filter);
+ }
+
+ /**
* Cambria based consumer
*/
- public static class CambriaConsumerWrapper implements BusConsumer {
+ public static class CambriaConsumerWrapper implements FilterableBusConsumer {
/**
* logger
*/
private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+
+ /**
+ * Used to build the consumer.
+ */
+ private final ConsumerBuilder builder;
/**
* Cambria client
*/
- protected CambriaConsumer consumer;
+ protected volatile CambriaConsumer consumer;
/**
* fetch timeout
@@ -105,7 +125,7 @@ public interface BusConsumer {
this.fetchTimeout = fetchTimeout;
- final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+ this.builder = new CambriaClientBuilders.ConsumerBuilder();
if (useHttps) {
@@ -158,6 +178,19 @@ public interface BusConsumer {
}
@Override
+ public void setFilter(String filter) {
+ logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
+ builder.withServerSideFilter(filter);
+
+ try {
+ consumer = builder.build();
+
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
public String toString() {
final StringBuilder builder = new StringBuilder();
builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index b7df8ca3..9b2be6a9 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,9 +27,10 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
import org.onap.policy.drools.event.comm.TopicListener;
import org.onap.policy.drools.event.comm.bus.BusTopicSource;
+import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
/**
* This topic source implementation specializes in reading messages
@@ -37,7 +38,7 @@ import org.onap.policy.drools.event.comm.bus.BusTopicSource;
*/
public abstract class SingleThreadedBusTopicSource
extends BusTopicBase
- implements Runnable, BusTopicSource {
+ implements Runnable, BusTopicSource, FilterableTopicSource {
/**
* Not to be converted to PolicyLogger.
@@ -286,6 +287,16 @@ public abstract class SingleThreadedBusTopicSource
@Override
+ public void setFilter(String filter) {
+ if(consumer instanceof FilterableBusConsumer) {
+ ((FilterableBusConsumer) consumer).setFilter(filter);
+
+ } else {
+ throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
+ }
+ }
+
+ @Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)