From 1d2c8346e0ac02320ca933b66c1943c7f72343c6 Mon Sep 17 00:00:00 2001 From: Jim Hahn Date: Mon, 26 Mar 2018 16:06:57 -0400 Subject: Support server-side filtering for UEB Cambria client supports server-side filtering. Enhanced the TopicSource classes that use cambria client to support server-side filtering. Updated license date in one of the files. Change-Id: Ia56d73c7a5f3ab960418709c1ea7f1e73aa4ba87 Issue-ID: POLICY-577 Signed-off-by: Jim Hahn --- .../drools/event/comm/FilterableTopicSource.java | 39 ++++++++++++++++++++++ .../event/comm/bus/internal/BusConsumer.java | 39 ++++++++++++++++++++-- .../bus/internal/SingleThreadedBusTopicSource.java | 17 ++++++++-- 3 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java new file mode 100644 index 00000000..b1e0e1c2 --- /dev/null +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.event.comm; + +/** + * TopicSource that supports server-side filtering. + */ +public interface FilterableTopicSource extends TopicSource { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws UnsupportedOperationException if the consumer does not support + * server-side filtering + * @throws IllegalArgumentException if the consumer cannot be built with the + * new filter + */ + public void setFilter(String filter); + +} diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index 984baa79..db240b3d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -60,20 +60,40 @@ public interface BusConsumer { */ public void close(); + /** + * BusConsumer that supports server-side filtering. + */ + public interface FilterableBusConsumer extends BusConsumer { + + /** + * Sets the server-side filter. + * + * @param filter new filter value, or {@code null} + * @throws IllegalArgumentException if the consumer cannot be built with + * the new filter + */ + public void setFilter(String filter); + } + /** * Cambria based consumer */ - public static class CambriaConsumerWrapper implements BusConsumer { + public static class CambriaConsumerWrapper implements FilterableBusConsumer { /** * logger */ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Used to build the consumer. + */ + private final ConsumerBuilder builder; /** * Cambria client */ - protected CambriaConsumer consumer; + protected volatile CambriaConsumer consumer; /** * fetch timeout @@ -105,7 +125,7 @@ public interface BusConsumer { this.fetchTimeout = fetchTimeout; - final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + this.builder = new CambriaClientBuilders.ConsumerBuilder(); if (useHttps) { @@ -157,6 +177,19 @@ public interface BusConsumer { this.consumer.close(); } + @Override + public void setFilter(String filter) { + logger.info("{}: setting DMAAP server-side filter: {}", this, filter); + builder.withServerSideFilter(filter); + + try { + consumer = builder.build(); + + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + @Override public String toString() { final StringBuilder builder = new StringBuilder(); diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index b7df8ca3..9b2be6a9 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,10 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.onap.policy.drools.event.comm.FilterableTopicSource; import org.onap.policy.drools.event.comm.TopicListener; import org.onap.policy.drools.event.comm.bus.BusTopicSource; +import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; /** * This topic source implementation specializes in reading messages @@ -37,7 +38,7 @@ import org.onap.policy.drools.event.comm.bus.BusTopicSource; */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource { + implements Runnable, BusTopicSource, FilterableTopicSource { /** * Not to be converted to PolicyLogger. @@ -286,6 +287,16 @@ public abstract class SingleThreadedBusTopicSource @Override + public void setFilter(String filter) { + if(consumer instanceof FilterableBusConsumer) { + ((FilterableBusConsumer) consumer).setFilter(filter); + + } else { + throw new UnsupportedOperationException("no server-side filtering for topic " + topic); + } + } + + @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup) -- cgit 1.2.3-korg