diff options
5 files changed, 6 insertions, 163 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java deleted file mode 100644 index 27f4ceb7..00000000 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/FilterableTopicSource.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * ONAP - * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.common.endpoints.event.comm; - -/** - * TopicSource that supports server-side filtering. - */ -public interface FilterableTopicSource extends TopicSource { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws UnsupportedOperationException if the consumer does not support - * server-side filtering - * @throws IllegalArgumentException if the consumer cannot be built with the - * new filter - */ - public void setFilter(String filter); - -} diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java index 233434f1..60ab2e9e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java @@ -61,23 +61,9 @@ public interface BusConsumer { public void close(); /** - * BusConsumer that supports server-side filtering. - */ - public interface FilterableBusConsumer extends BusConsumer { - - /** - * Sets the server-side filter. - * - * @param filter new filter value, or {@code null} - * @throws IllegalArgumentException if the consumer cannot be built with the new filter - */ - public void setFilter(String filter); - } - - /** * Cambria based consumer. */ - public static class CambriaConsumerWrapper implements FilterableBusConsumer { + public static class CambriaConsumerWrapper implements BusConsumer { /** * logger. @@ -90,19 +76,9 @@ public interface BusConsumer { private final ConsumerBuilder builder; /** - * Locked while updating {@link #consumer} and {@link #newConsumer}. - */ - private final Object consLocker = new Object(); - - /** * Cambria client. */ - private CambriaConsumer consumer; - - /** - * Cambria client to use for next fetch. - */ - private CambriaConsumer newConsumer = null; + private final CambriaConsumer consumer; /** * fetch timeout. @@ -169,7 +145,7 @@ public interface BusConsumer { @Override public Iterable<String> fetch() throws IOException { try { - return getCurrentConsumer().fetch(); + return this.consumer.fetch(); } catch (final IOException e) { //NOSONAR logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), this.fetchTimeout); @@ -191,55 +167,7 @@ public interface BusConsumer { @Override public void close() { this.closeCondition.countDown(); - getCurrentConsumer().close(); - } - - private CambriaConsumer getCurrentConsumer() { - CambriaConsumer old = null; - CambriaConsumer ret; - - synchronized (consLocker) { - if (this.newConsumer != null) { - // replace old consumer with new consumer - old = this.consumer; - this.consumer = this.newConsumer; - this.newConsumer = null; - } - - ret = this.consumer; - } - - if (old != null) { - old.close(); - } - - return ret; - } - - @Override - public void setFilter(String filter) { - logger.info("{}: setting DMAAP server-side filter: {}", this, filter); - builder.withServerSideFilter(filter); - - try { - CambriaConsumer previous; - synchronized (consLocker) { - previous = this.newConsumer; - this.newConsumer = builder.build(); - } - - if (previous != null) { - // there was already a new consumer - close it - previous.close(); - } - - } catch (MalformedURLException | GeneralSecurityException e) { - /* - * Since an exception occurred, "consumer" still has its old value, thus it should - * not be closed at this point. - */ - throw new IllegalArgumentException(e); - } + this.consumer.close(); } @Override diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java index e52204f6..6e746948 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -24,10 +24,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.io.IOException; import java.net.MalformedURLException; import java.util.UUID; -import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -40,7 +38,7 @@ import org.slf4j.LoggerFactory; * notifying its listeners. */ public abstract class SingleThreadedBusTopicSource extends BusTopicBase - implements Runnable, BusTopicSource, FilterableTopicSource { + implements Runnable, BusTopicSource { /** * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only @@ -262,17 +260,6 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase return broadcast(event); } - - @Override - public void setFilter(String filter) { - if (consumer instanceof FilterableBusConsumer) { - ((FilterableBusConsumer) consumer).setFilter(filter); - - } else { - throw new UnsupportedOperationException("no server-side filtering for topic " + topic); - } - } - @Override public String toString() { return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java index 5264b2f4..82d5eef8 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java @@ -100,23 +100,7 @@ public class BusConsumerTest extends TopicTestBase { @Test public void testCambriaConsumerWrapperClose() { CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - - // set filter several times to cause different branches of close() to be executed - for (int count = 0; count < 3; ++count) { - cons.close(); - final int count2 = count; - assertThatCode(() -> cons.setFilter("close=" + count2)).doesNotThrowAnyException(); - } - } - - @Test - public void testCambriaConsumerWrapperSetFilter() { - // set filter several times to cause different branches to be executed - CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build()); - for (int count = 0; count < 3; ++count) { - final int count2 = count; - assertThatCode(() -> cons.setFilter("set-filter=" + count2)).doesNotThrowAnyException(); - } + assertThatCode(() -> cons.close()).doesNotThrowAnyException(); } @Test diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java index 1e95924d..1a5506de 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSourceTest.java @@ -43,7 +43,6 @@ import org.mockito.stubbing.Answer; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicListener; import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase; -import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer; import org.onap.policy.common.utils.gson.GsonTestUtils; public class SingleThreadedBusTopicSourceTest extends TopicTestBase { @@ -287,22 +286,6 @@ public class SingleThreadedBusTopicSourceTest extends TopicTestBase { } @Test - public void testSetFilter() { - FilterableBusConsumer filt = mock(FilterableBusConsumer.class); - cons = filt; - - source.start(); - source.setFilter("my-filter"); - verify(filt).setFilter("my-filter"); - } - - @Test(expected = UnsupportedOperationException.class) - public void testSetFilter_Unsupported() { - source.start(); - source.setFilter("unsupported-filter"); - } - - @Test public void testGetConsumerGroup() { assertEquals(MY_CONS_GROUP, source.getConsumerGroup()); } |