diff options
author | Jorge Hernandez <jh1730@att.com> | 2018-03-27 15:10:33 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-03-27 15:10:33 +0000 |
commit | 076d282ba5b3cfef57aec37a7518c1e3ccffd65a (patch) | |
tree | b55ffd5b1f7f37f1062dc04597c329c348dc98bc | |
parent | 13e9aa8e0613877c3ce63c878978e9cb31b92c48 (diff) | |
parent | 1d2c8346e0ac02320ca933b66c1943c7f72343c6 (diff) |
Merge "Support server-side filtering for UEB"
3 files changed, 89 insertions, 6 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java new file mode 100644 index 00000000..b1e0e1c2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.event.comm; + +/** + * TopicSource that supports server-side filtering. + */ +public interface FilterableTopicSource extends TopicSource { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws UnsupportedOperationException if the consumer does not support + * server-side filtering + * @throws IllegalArgumentException if the consumer cannot be built with the + * new filter + */ + public void setFilter(String filter); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index 984baa79..db240b3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -61,19 +61,39 @@ public interface BusConsumer { public void close(); /** + * BusConsumer that supports server-side filtering. + */ + public interface FilterableBusConsumer extends BusConsumer { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws IllegalArgumentException if the consumer cannot be built with + * the new filter + */ + public void setFilter(String filter); + } + + /** * Cambria based consumer */ - public static class CambriaConsumerWrapper implements BusConsumer { + public static class CambriaConsumerWrapper implements FilterableBusConsumer { /** * logger */ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Used to build the consumer. + */ + private final ConsumerBuilder builder; /** * Cambria client */ - protected CambriaConsumer consumer; + protected volatile CambriaConsumer consumer; /** * fetch timeout @@ -105,7 +125,7 @@ public interface BusConsumer { this.fetchTimeout = fetchTimeout; - final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + this.builder = new CambriaClientBuilders.ConsumerBuilder(); if (useHttps) { @@ -158,6 +178,19 @@ public interface BusConsumer { } @Override + public void setFilter(String filter) { + logger.info("{}: setting DMAAP server-side filter: {}", this, filter); + builder.withServerSideFilter(filter); + + try { + consumer = builder.build(); + + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override public String toString() { final StringBuilder builder = new StringBuilder(); builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]"); diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index b7df8ca3..9b2be6a9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,10 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicListener; import org.onap.policy.drools.event.comm.bus.BusTopicSource; +import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; /** * This topic source implementation specializes in reading messages @@ -37,7 +38,7 @@ import org.onap.policy.drools.event.comm.bus.BusTopicSource; */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource { + implements Runnable, BusTopicSource, FilterableTopicSource { /** * Not to be converted to PolicyLogger. @@ -286,6 +287,16 @@ public abstract class SingleThreadedBusTopicSource @Override + public void setFilter(String filter) { + if(consumer instanceof FilterableBusConsumer) { + ((FilterableBusConsumer) consumer).setFilter(filter); + + } else { + throw new UnsupportedOperationException("no server-side filtering for topic " + topic); + } + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) |