diff options
Diffstat (limited to 'policy-endpoints')
9 files changed, 1280 insertions, 11 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java index 08e8dfe8..09078720 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java @@ -150,7 +150,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return dmaapTopicWriters.get(busTopicParams.getTopic()); } - DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(busTopicParams); + DmaapTopicSink dmaapTopicSink = makeSink(busTopicParams); if (busTopicParams.isManaged()) { dmaapTopicWriters.put(busTopicParams.getTopic(), dmaapTopicSink); @@ -210,7 +210,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { final String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX); - final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + final String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX); /* DME2 Properties */ @@ -343,6 +343,16 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { } } + /** + * Makes a new sink. + * + * @param busTopicParams parameters to use to configure the sink + * @return a new sink + */ + protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { + return new InlineDmaapTopicSink(busTopicParams); + } + @Override public void destroy(String topic) { @@ -385,7 +395,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { if (dmaapTopicWriters.containsKey(topic)) { return dmaapTopicWriters.get(topic); } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + throw new IllegalArgumentException("DmaapTopicSink for " + topic + " not found"); } } } @@ -397,9 +407,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSinkFactory []"); - return builder.toString(); + return "IndexedDmaapTopicSinkFactory []"; } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java index d5e04d50..f45164f8 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java @@ -152,8 +152,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return dmaapTopicSources.get(busTopicParams.getTopic()); } - DmaapTopicSource dmaapTopicSource = - new SingleThreadedDmaapTopicSource(busTopicParams); + DmaapTopicSource dmaapTopicSource = makeSource(busTopicParams); if (busTopicParams.isManaged()) { dmaapTopicSources.put(busTopicParams.getTopic(), dmaapTopicSource); @@ -396,6 +395,16 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { return this.build(servers, topic, null, null); } + /** + * Makes a new source. + * + * @param busTopicParams parameters to use to configure the source + * @return a new source + */ + protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { + return new SingleThreadedDmaapTopicSource(busTopicParams); + } + @Override public void destroy(String topic) { @@ -451,9 +460,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("IndexedDmaapTopicSourceFactory []"); - return builder.toString(); + return "IndexedDmaapTopicSourceFactory []"; } } diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java new file mode 100644 index 00000000..8ca22ebe --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicPropertyBuilder.java @@ -0,0 +1,116 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_AFT_ENV; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_API_KEY; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_API_SECRET; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_ENV; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_FETCH_LIMIT; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_FETCH_TIMEOUT; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_LAT; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_LONG; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_PARTITION; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_PARTNER; +import static org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase.MY_ROUTE; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX; + +public class DmaapTopicPropertyBuilder extends TopicPropertyBuilder { + + public static final String SERVER = "my-server"; + public static final String TOPIC2 = "my-topic-2"; + + public static final String MY_CONN_TIMEOUT = "200"; + public static final String MY_READ_TIMEOUT = "201"; + public static final String MY_ROUNDTRIP_TIMEOUT = "202"; + public static final String MY_STICKINESS = "true"; + public static final String MY_SUBCONTEXT = "my-subcontext"; + public static final String MY_DME_VERSION = "my-version"; + public static final String MY_AAF_MECHID = "my-aaf-mechid"; + public static final String MY_AAF_PASSWD = "my-aaf-passwd"; + + /** + * Constructs the object. + * + * @param prefix the prefix for the properties to be built + */ + public DmaapTopicPropertyBuilder(String prefix) { + super(prefix); + } + + /** + * Adds a topic and configures it's properties with default values. + * + * @param topic the topic to be added + * @return this builder + */ + public DmaapTopicPropertyBuilder makeTopic(String topic) { + addTopic(topic); + + setTopicProperty(PROPERTY_MANAGED_SUFFIX, "true"); + setTopicProperty(PROPERTY_HTTP_HTTPS_SUFFIX, "true"); + setTopicProperty(PROPERTY_TOPIC_AAF_MECHID_SUFFIX, MY_AAF_MECHID); + setTopicProperty(PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX, MY_AAF_PASSWD); + setTopicProperty(PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX, MY_AFT_ENV); + setTopicProperty(PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, "true"); + setTopicProperty(PROPERTY_TOPIC_API_KEY_SUFFIX, MY_API_KEY); + setTopicProperty(PROPERTY_TOPIC_API_SECRET_SUFFIX, MY_API_SECRET); + setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, MY_FETCH_LIMIT); + setTopicProperty(PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, MY_FETCH_TIMEOUT); + setTopicProperty(PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, MY_CONN_TIMEOUT); + setTopicProperty(PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX, MY_ENV); + setTopicProperty(PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX, MY_LAT); + setTopicProperty(PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX, MY_LONG); + setTopicProperty(PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, MY_PARTITION); + setTopicProperty(PROPERTY_DMAAP_DME2_PARTNER_SUFFIX, MY_PARTNER); + setTopicProperty(PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, MY_READ_TIMEOUT); + setTopicProperty(PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, MY_ROUNDTRIP_TIMEOUT); + setTopicProperty(PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, MY_ROUTE); + setTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX, SERVER); + setTopicProperty(PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, MY_STICKINESS); + setTopicProperty(PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, MY_SUBCONTEXT); + setTopicProperty(PROPERTY_DMAAP_DME2_VERSION_SUFFIX, MY_DME_VERSION); + + return this; + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java new file mode 100644 index 00000000..a9084764 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactoryTest.java @@ -0,0 +1,403 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +public class DmaapTopicSinkFactoryTest extends BusTopicTestBase { + + private static final String SERVER = "my-server"; + private static final String TOPIC2 = "my-topic-2"; + + private static final String MY_CONN_TIMEOUT = "200"; + private static final String MY_READ_TIMEOUT = "201"; + private static final String MY_ROUNDTRIP_TIMEOUT = "202"; + private static final String MY_STICKINESS = "true"; + private static final String MY_SUBCONTEXT = "my-subcontext"; + private static final String MY_DME_VERSION = "my-version"; + + private SinkFactory factory; + + /** + * Creates the object to be tested. + */ + @Before + public void setUp() { + super.setUp(); + + factory = new SinkFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + public void testBuildBusTopicParams() { + // two unmanaged topics + DmaapTopicSink sink = factory.build(makeBuilder().managed(false).build()); + DmaapTopicSink sink2 = factory.build(makeBuilder().managed(false).topic(TOPIC2).build()); + assertNotNull(sink); + assertNotNull(sink2); + assertTrue(sink != sink2); + + // duplicate topics, but since they aren't managed, they should be different + DmaapTopicSink sink3 = factory.build(makeBuilder().managed(false).build()); + DmaapTopicSink sink4 = factory.build(makeBuilder().managed(false).build()); + assertNotNull(sink3); + assertNotNull(sink4); + assertTrue(sink != sink3); + assertTrue(sink != sink4); + assertTrue(sink3 != sink4); + + // two managed topics + DmaapTopicSink sink5 = factory.build(makeBuilder().build()); + DmaapTopicSink sink6 = factory.build(makeBuilder().topic(TOPIC2).build()); + assertNotNull(sink5); + assertNotNull(sink6); + + // re-build same managed topics - should get exact same objects + assertTrue(sink5 == factory.build(BusTopicParams.builder().topic(MY_TOPIC).build())); + assertTrue(sink6 == factory.build(makeBuilder().topic(TOPIC2).build())); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildBusTopicParams_NullTopic() { + factory.build(makeBuilder().topic(null).build()); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildBusTopicParams_EmptyTopic() { + factory.build(makeBuilder().topic("").build()); + } + + @Test + public void testBuildListOfStringString() { + DmaapTopicSink sink1 = factory.build(servers, MY_TOPIC); + assertNotNull(sink1); + + // check parameters that were used + BusTopicParams params = factory.params.get(0); + assertEquals(servers, params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(false, params.isAllowSelfSignedCerts()); + + DmaapTopicSink sink2 = factory.build(servers, TOPIC2); + assertNotNull(sink2); + assertTrue(sink1 != sink2); + + // duplicate - should be the same as these topics are managed + DmaapTopicSink sink3 = factory.build(Collections.emptyList(), TOPIC2); + assertTrue(sink2 == sink3); + } + + @Test + public void testBuildProperties() { + assertEquals(1, factory.build(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); + + BusTopicParams params = factory.params.get(0); + assertEquals(true, params.isManaged()); + assertEquals(true, params.isUseHttps()); + assertEquals(true, params.isAllowSelfSignedCerts()); + assertEquals(MY_API_KEY, params.getApiKey()); + assertEquals(MY_API_SECRET, params.getApiSecret()); + assertEquals(MY_ENV, params.getEnvironment()); + assertEquals(MY_LAT, params.getLatitude()); + assertEquals(MY_LONG, params.getLongitude()); + assertEquals(MY_PARTITION, params.getPartitionId()); + assertEquals(MY_PARTNER, params.getPartner()); + assertEquals(Arrays.asList(SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + + Map<String, String> add = params.getAdditionalProps(); + assertEquals(MY_CONN_TIMEOUT, add.get(DmaapTopicSinkFactory.DME2_EP_CONN_TIMEOUT_PROPERTY)); + assertEquals(MY_READ_TIMEOUT, add.get(DmaapTopicSinkFactory.DME2_READ_TIMEOUT_PROPERTY)); + assertEquals(MY_ROUNDTRIP_TIMEOUT, add.get(DmaapTopicSinkFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY)); + assertEquals(MY_ROUTE, add.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)); + assertEquals(MY_STICKINESS, add.get(DmaapTopicSinkFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY)); + assertEquals(MY_SUBCONTEXT, add.get(DmaapTopicSinkFactory.DME2_SUBCONTEXT_PATH_PROPERTY)); + assertEquals(MY_DME_VERSION, add.get(DmaapTopicSinkFactory.DME2_VERSION_PROPERTY)); + } + + @Test + public void testBuildProperties_Variations() { + TopicPropertyBuilder builder = makePropBuilder().makeTopic(MY_TOPIC); + + // null sinks + Properties props = builder.build(); + props.remove(PROPERTY_DMAAP_SINK_TOPICS); + assertTrue(factory.build(props).isEmpty()); + + // empty sinks + props = builder.build(); + props.setProperty(PROPERTY_DMAAP_SINK_TOPICS, ""); + assertTrue(factory.build(props).isEmpty()); + + // null servers + assertTrue(factory.build(makePropBuilder().makeTopic(MY_TOPIC) + .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).isEmpty()); + + // empty servers + assertTrue(factory.build(makePropBuilder().makeTopic(MY_TOPIC) + .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).isEmpty()); + + // check boolean properties that default to true + checkDefault(builder, PROPERTY_MANAGED_SUFFIX, BusTopicParams::isManaged); + + // check boolean properties that default to false + checkDefault(builder, PROPERTY_HTTP_HTTPS_SUFFIX, params -> ! params.isUseHttps()); + checkDefault(builder, PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, + params -> ! params.isAllowSelfSignedCerts()); + + // check "additional" properties + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, + DmaapTopicSinkFactory.DME2_EP_CONN_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, + DmaapTopicSinkFactory.DME2_READ_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, + DmaapTopicSinkFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, + DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, + DmaapTopicSinkFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, + DmaapTopicSinkFactory.DME2_SUBCONTEXT_PATH_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_VERSION_SUFFIX, DmaapTopicSinkFactory.DME2_VERSION_PROPERTY); + } + + @Test + public void testBuildProperties_Multiple() { + TopicPropertyBuilder builder = + makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).addTopic(MY_TOPIC).addTopic(MY_TOPIC); + + List<DmaapTopicSink> lst = factory.build(builder.build()); + assertEquals(4, lst.size()); + + int index = 0; + DmaapTopicSink sink = lst.get(index++); + assertTrue(sink != lst.get(index++)); + assertTrue(sink == lst.get(index++)); + assertTrue(sink == lst.get(index++)); + } + + @Test + public void testDestroyString_testGet_testInventory() { + List<DmaapTopicSink> lst = factory.build(makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).build()); + + int index = 0; + DmaapTopicSink sink1 = lst.get(index++); + DmaapTopicSink sink2 = lst.get(index++); + + assertEquals(2, factory.inventory().size()); + assertTrue(factory.inventory().contains(sink1)); + assertTrue(factory.inventory().contains(sink2)); + + sink1.start(); + sink2.start(); + + assertEquals(sink1, factory.get(MY_TOPIC)); + assertEquals(sink2, factory.get(TOPIC2)); + + factory.destroy(MY_TOPIC); + assertFalse(sink1.isAlive()); + assertTrue(sink2.isAlive()); + assertEquals(sink2, factory.get(TOPIC2)); + assertEquals(1, factory.inventory().size()); + assertTrue(factory.inventory().contains(sink2)); + + // repeat + factory.destroy(MY_TOPIC); + assertFalse(sink1.isAlive()); + assertTrue(sink2.isAlive()); + + // with other topic + factory.destroy(TOPIC2); + assertFalse(sink1.isAlive()); + assertFalse(sink2.isAlive()); + assertEquals(0, factory.inventory().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDestroyString_NullTopic() { + factory.destroy(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testDestroyString_EmptyTopic() { + factory.destroy(""); + } + + @Test + public void testDestroy() { + List<DmaapTopicSink> lst = factory.build(makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).build()); + + int index = 0; + DmaapTopicSink sink1 = lst.get(index++); + DmaapTopicSink sink2 = lst.get(index++); + + sink1.start(); + sink2.start(); + + factory.destroy(); + assertFalse(sink1.isAlive()); + assertFalse(sink2.isAlive()); + assertEquals(0, factory.inventory().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_NullTopic() { + factory.get(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_EmptyTopic() { + factory.get(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_UnknownTopic() { + factory.build(makePropBuilder().makeTopic(MY_TOPIC).build()); + factory.get(TOPIC2); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedDmaapTopicSinkFactory [")); + } + + private DmaapTopicPropertyBuilder makePropBuilder() { + return new DmaapTopicPropertyBuilder(PROPERTY_DMAAP_SINK_TOPICS); + } + + /** + * Verifies that a parameter has the correct default, if the original builder property + * is not provided. + * + * @param builder used to build a set of properties + * @param builderName name of the builder property + * @param getter function to get the property from a set of parameters + */ + private void checkDefault(TopicPropertyBuilder builder, String builderName, + Function<BusTopicParams, Boolean> getter) { + + /* + * Not sure why the "managed" property is treated differently, but it is. + */ + String prefix = PROPERTY_DMAAP_SINK_TOPICS + "." + MY_TOPIC; + + // always start with a fresh factory + factory.destroy(); + factory = new SinkFactory(); + + Properties props = builder.build(); + props.remove(prefix + builderName); + + assertEquals(1, factory.build(props).size()); + assertTrue(getter.apply(factory.params.get(0))); + + // repeat, this time using an empty string instead of null + factory.destroy(); + factory = new SinkFactory(); + + props.setProperty(prefix + builderName, ""); + + assertEquals(1, factory.build(props).size()); + assertTrue(getter.apply(factory.params.get(0))); + } + + /** + * Verifies that an "additional" property does not exist, if the original builder + * property is not provided. + * + * @param builder used to build a set of properties + * @param builderName name of the builder property + * @param addName name of the "additional" property + */ + private void expectNullAddProp(TopicPropertyBuilder builder, String builderName, String addName) { + // always start with a fresh factory + factory.destroy(); + factory = new SinkFactory(); + + Properties props = builder.build(); + props.remove(PROPERTY_DMAAP_SINK_TOPICS + "." + MY_TOPIC + builderName); + + assertEquals(1, factory.build(props).size()); + assertFalse(factory.params.get(0).getAdditionalProps().containsKey(addName)); + + // repeat, this time using an empty string instead of null + factory.destroy(); + factory = new SinkFactory(); + + props.setProperty(PROPERTY_DMAAP_SINK_TOPICS + "." + MY_TOPIC + builderName, ""); + + assertEquals(1, factory.build(props).size()); + assertFalse(factory.params.get(0).getAdditionalProps().containsKey(addName)); + } + + /** + * Factory that records the parameters of all of the sinks it creates. + */ + private static class SinkFactory extends IndexedDmaapTopicSinkFactory { + private List<BusTopicParams> params = new LinkedList<>(); + + @Override + protected DmaapTopicSink makeSink(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSink(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java new file mode 100644 index 00000000..52ba3e9e --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class DmaapTopicSinkTest { + + @Test + public void test() { + assertNotNull(DmaapTopicSink.factory); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java new file mode 100644 index 00000000..cd276de5 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactoryTest.java @@ -0,0 +1,431 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX; +import static org.onap.policy.common.endpoints.properties.PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; + +public class DmaapTopicSourceFactoryTest extends BusTopicTestBase { + + private static final String SERVER = "my-server"; + private static final String TOPIC2 = "my-topic-2"; + + private static final String MY_CONN_TIMEOUT = "200"; + private static final String MY_READ_TIMEOUT = "201"; + private static final String MY_ROUNDTRIP_TIMEOUT = "202"; + private static final String MY_STICKINESS = "true"; + private static final String MY_SUBCONTEXT = "my-subcontext"; + private static final String MY_DME_VERSION = "my-version"; + + private SourceFactory factory; + + /** + * Creates the object to be tested. + */ + @Before + public void setUp() { + super.setUp(); + + factory = new SourceFactory(); + } + + @After + public void tearDown() { + factory.destroy(); + } + + @Test + public void testBuildBusTopicParams() { + // two unmanaged topics + DmaapTopicSource source = factory.build(makeBuilder().managed(false).build()); + DmaapTopicSource source2 = factory.build(makeBuilder().managed(false).topic(TOPIC2).build()); + assertNotNull(source); + assertNotNull(source2); + assertTrue(source != source2); + + // duplicate topics, but since they aren't managed, they should be different + DmaapTopicSource source3 = factory.build(makeBuilder().managed(false).build()); + DmaapTopicSource source4 = factory.build(makeBuilder().managed(false).build()); + assertNotNull(source3); + assertNotNull(source4); + assertTrue(source != source3); + assertTrue(source != source4); + assertTrue(source3 != source4); + + // two managed topics + DmaapTopicSource source5 = factory.build(makeBuilder().build()); + DmaapTopicSource source6 = factory.build(makeBuilder().topic(TOPIC2).build()); + assertNotNull(source5); + assertNotNull(source6); + + // re-build same managed topics - should get exact same objects + assertTrue(source5 == factory.build(BusTopicParams.builder().topic(MY_TOPIC).build())); + assertTrue(source6 == factory.build(makeBuilder().topic(TOPIC2).build())); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildBusTopicParams_NullTopic() { + factory.build(makeBuilder().topic(null).build()); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildBusTopicParams_EmptyTopic() { + factory.build(makeBuilder().topic("").build()); + } + + @Test + public void testBuildProperties() { + assertEquals(1, factory.build(makePropBuilder().makeTopic(MY_TOPIC).build()).size()); + + BusTopicParams params = factory.params.get(0); + assertEquals(true, params.isManaged()); + assertEquals(true, params.isUseHttps()); + assertEquals(true, params.isAllowSelfSignedCerts()); + assertEquals(MY_API_KEY, params.getApiKey()); + assertEquals(MY_API_SECRET, params.getApiSecret()); + assertEquals(MY_ENV, params.getEnvironment()); + assertEquals(MY_LAT, params.getLatitude()); + assertEquals(MY_LONG, params.getLongitude()); + assertEquals(MY_PARTNER, params.getPartner()); + assertEquals(Arrays.asList(SERVER), params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(MY_FETCH_LIMIT, params.getFetchLimit()); + assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout()); + + Map<String, String> add = params.getAdditionalProps(); + assertEquals(MY_CONN_TIMEOUT, add.get(DmaapTopicSourceFactory.DME2_EP_CONN_TIMEOUT_PROPERTY)); + assertEquals(MY_READ_TIMEOUT, add.get(DmaapTopicSourceFactory.DME2_READ_TIMEOUT_PROPERTY)); + assertEquals(MY_ROUNDTRIP_TIMEOUT, add.get(DmaapTopicSourceFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY)); + assertEquals(MY_ROUTE, add.get(DmaapTopicSourceFactory.DME2_ROUTE_OFFER_PROPERTY)); + assertEquals(MY_STICKINESS, add.get(DmaapTopicSourceFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY)); + assertEquals(MY_SUBCONTEXT, add.get(DmaapTopicSourceFactory.DME2_SUBCONTEXT_PATH_PROPERTY)); + assertEquals(MY_DME_VERSION, add.get(DmaapTopicSourceFactory.DME2_VERSION_PROPERTY)); + } + + @Test + public void testBuildProperties_Variations() { + TopicPropertyBuilder builder = makePropBuilder().makeTopic(MY_TOPIC); + + // null sources + Properties props = builder.build(); + props.remove(PROPERTY_DMAAP_SOURCE_TOPICS); + assertTrue(factory.build(props).isEmpty()); + + // empty sources + props = builder.build(); + props.setProperty(PROPERTY_DMAAP_SOURCE_TOPICS, ""); + assertTrue(factory.build(props).isEmpty()); + + // null servers + assertTrue(factory.build(makePropBuilder().makeTopic(MY_TOPIC) + .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).isEmpty()); + + // empty servers + assertTrue(factory.build(makePropBuilder().makeTopic(MY_TOPIC) + .removeTopicProperty(PROPERTY_TOPIC_SERVERS_SUFFIX).build()).isEmpty()); + + // check boolean properties that default to true + checkDefault(builder, PROPERTY_MANAGED_SUFFIX, BusTopicParams::isManaged); + + // check boolean properties that default to false + checkDefault(builder, PROPERTY_HTTP_HTTPS_SUFFIX, params -> !params.isUseHttps()); + checkDefault(builder, PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX, + params -> !params.isAllowSelfSignedCerts()); + + // check other properties having default values + checkDefault(builder, PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX, + params -> params.getFetchTimeout() == DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, null, "", + "invalid-timeout"); + checkDefault(builder, PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX, + params -> params.getFetchLimit() == DmaapTopicSource.DEFAULT_LIMIT_FETCH, null, "", + "invalid-limit"); + + // check "additional" properties + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX, + DmaapTopicSourceFactory.DME2_EP_CONN_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX, + DmaapTopicSourceFactory.DME2_READ_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX, + DmaapTopicSourceFactory.DME2_ROUNDTRIP_TIMEOUT_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX, + DmaapTopicSourceFactory.DME2_ROUTE_OFFER_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX, + DmaapTopicSourceFactory.DME2_SESSION_STICKINESS_REQUIRED_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX, + DmaapTopicSourceFactory.DME2_SUBCONTEXT_PATH_PROPERTY); + + expectNullAddProp(builder, PROPERTY_DMAAP_DME2_VERSION_SUFFIX, DmaapTopicSourceFactory.DME2_VERSION_PROPERTY); + } + + @Test + public void testBuildProperties_Multiple() { + TopicPropertyBuilder builder = + makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).addTopic(MY_TOPIC).addTopic(MY_TOPIC); + + List<DmaapTopicSource> lst = factory.build(builder.build()); + assertEquals(4, lst.size()); + + int index = 0; + DmaapTopicSource source = lst.get(index++); + assertTrue(source != lst.get(index++)); + assertTrue(source == lst.get(index++)); + assertTrue(source == lst.get(index++)); + } + + @Test + public void testBuildListOfStringStringStringString() { + DmaapTopicSource source1 = factory.build(servers, MY_TOPIC, MY_API_KEY, MY_API_SECRET); + assertNotNull(source1); + + // check parameters that were used + BusTopicParams params = factory.params.get(0); + assertEquals(servers, params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(false, params.isAllowSelfSignedCerts()); + assertEquals(MY_API_KEY, params.getApiKey()); + assertEquals(MY_API_SECRET, params.getApiSecret()); + assertEquals(DmaapTopicSource.DEFAULT_LIMIT_FETCH, params.getFetchLimit()); + assertEquals(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout()); + } + + @Test + public void testBuildListOfStringString() { + DmaapTopicSource source1 = factory.build(servers, MY_TOPIC); + assertNotNull(source1); + + // check parameters that were used + BusTopicParams params = factory.params.get(0); + assertEquals(servers, params.getServers()); + assertEquals(MY_TOPIC, params.getTopic()); + assertEquals(true, params.isManaged()); + assertEquals(false, params.isUseHttps()); + assertEquals(false, params.isAllowSelfSignedCerts()); + assertEquals(null, params.getApiKey()); + assertEquals(null, params.getApiSecret()); + assertEquals(DmaapTopicSource.DEFAULT_LIMIT_FETCH, params.getFetchLimit()); + assertEquals(DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, params.getFetchTimeout()); + + DmaapTopicSource source2 = factory.build(servers, TOPIC2); + assertNotNull(source2); + assertTrue(source1 != source2); + + // duplicate - should be the same as these topics are managed + DmaapTopicSource source3 = factory.build(Collections.emptyList(), TOPIC2); + assertTrue(source2 == source3); + } + + @Test + public void testDestroyString_testGet_testInventory() { + List<DmaapTopicSource> lst = factory.build(makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).build()); + + int index = 0; + DmaapTopicSource source1 = lst.get(index++); + DmaapTopicSource source2 = lst.get(index++); + + assertEquals(2, factory.inventory().size()); + assertTrue(factory.inventory().contains(source1)); + assertTrue(factory.inventory().contains(source2)); + + source1.start(); + source2.start(); + + assertEquals(source1, factory.get(MY_TOPIC)); + assertEquals(source2, factory.get(TOPIC2)); + + factory.destroy(MY_TOPIC); + assertFalse(source1.isAlive()); + assertTrue(source2.isAlive()); + assertEquals(source2, factory.get(TOPIC2)); + assertEquals(1, factory.inventory().size()); + assertTrue(factory.inventory().contains(source2)); + + // repeat + factory.destroy(MY_TOPIC); + assertFalse(source1.isAlive()); + assertTrue(source2.isAlive()); + + // with other topic + factory.destroy(TOPIC2); + assertFalse(source1.isAlive()); + assertFalse(source2.isAlive()); + assertEquals(0, factory.inventory().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDestroyString_NullTopic() { + factory.destroy(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testDestroyString_EmptyTopic() { + factory.destroy(""); + } + + @Test + public void testDestroy() { + List<DmaapTopicSource> lst = factory.build(makePropBuilder().makeTopic(MY_TOPIC).makeTopic(TOPIC2).build()); + + int index = 0; + DmaapTopicSource source1 = lst.get(index++); + DmaapTopicSource source2 = lst.get(index++); + + source1.start(); + source2.start(); + + factory.destroy(); + assertFalse(source1.isAlive()); + assertFalse(source2.isAlive()); + assertEquals(0, factory.inventory().size()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_NullTopic() { + factory.get(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_EmptyTopic() { + factory.get(""); + } + + @Test(expected = IllegalArgumentException.class) + public void testGet_UnknownTopic() { + factory.build(makePropBuilder().makeTopic(MY_TOPIC).build()); + factory.get(TOPIC2); + } + + @Test + public void testToString() { + assertTrue(factory.toString().startsWith("IndexedDmaapTopicSourceFactory [")); + } + + private DmaapTopicPropertyBuilder makePropBuilder() { + return new DmaapTopicPropertyBuilder(PROPERTY_DMAAP_SOURCE_TOPICS); + } + + /** + * Verifies that a parameter has the correct default, if the original builder property + * is not provided. + * + * @param builder used to build a set of properties + * @param builderName name of the builder property + * @param getter function to get the property from a set of parameters + * @param values possible values to try, defaults to {null, ""} + */ + private void checkDefault(TopicPropertyBuilder builder, String builderName, + Function<BusTopicParams, Boolean> getter, Object... values) { + + Object[] values2 = (values.length > 0 ? values : new String[] {null, ""}); + + for (Object value : values2) { + // always start with a fresh factory + factory.destroy(); + factory = new SourceFactory(); + + if (value == null) { + builder.removeTopicProperty(builderName); + + } else { + builder.setTopicProperty(builderName, value.toString()); + } + + assertEquals(1, factory.build(builder.build()).size()); + assertTrue(getter.apply(factory.params.get(0))); + } + } + + /** + * Verifies that an "additional" property does not exist, if the original builder + * property is not provided. + * + * @param builder used to build a set of properties + * @param builderName name of the builder property + * @param addName name of the "additional" property + */ + private void expectNullAddProp(TopicPropertyBuilder builder, String builderName, String addName) { + // always start with a fresh factory + factory.destroy(); + factory = new SourceFactory(); + + Properties props = builder.build(); + props.remove(PROPERTY_DMAAP_SOURCE_TOPICS + "." + MY_TOPIC + builderName); + + assertEquals(1, factory.build(props).size()); + assertFalse(factory.params.get(0).getAdditionalProps().containsKey(addName)); + + // repeat, this time using an empty string instead of null + factory.destroy(); + factory = new SourceFactory(); + + props.setProperty(PROPERTY_DMAAP_SOURCE_TOPICS + "." + MY_TOPIC + builderName, ""); + + assertEquals(1, factory.build(props).size()); + assertFalse(factory.params.get(0).getAdditionalProps().containsKey(addName)); + } + + /** + * Factory that records the parameters of all of the sources it creates. + */ + private static class SourceFactory extends IndexedDmaapTopicSourceFactory { + private List<BusTopicParams> params = new LinkedList<>(); + + @Override + protected DmaapTopicSource makeSource(BusTopicParams busTopicParams) { + params.add(busTopicParams); + return super.makeSource(busTopicParams); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java new file mode 100644 index 00000000..6828444a --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceTest.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; + +public class DmaapTopicSourceTest { + + @Test + public void test() { + assertNotNull(DmaapTopicSource.factory); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java new file mode 100644 index 00000000..63c29111 --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/NoopTopicSinkTest.java @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START======================================================= + * policy-endpoints + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; + +public class NoopTopicSinkTest extends BusTopicTestBase { + + private NoopTopicSink sink; + + /** + * Creates the object to be tested. + */ + @Before + public void setUp() { + super.setUp(); + + sink = new NoopTopicSink(servers, MY_TOPIC); + } + + @Test + public void testToString() { + assertTrue(sink.toString().startsWith("NoopTopicSink [")); + } + + @Test + public void testSend() { + TopicListener listener = mock(TopicListener.class); + sink.register(listener); + sink.start(); + + assertTrue(sink.send(MY_MESSAGE)); + + assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents())); + verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE); + + // generate exception during broadcast + sink = new NoopTopicSink(servers, MY_TOPIC) { + @Override + protected boolean broadcast(String message) { + throw new RuntimeException(EXPECTED); + } + + }; + + sink.start(); + assertFalse(sink.send(MY_MESSAGE)); + } + + @Test(expected = IllegalArgumentException.class) + public void testSend_NullMessage() { + sink.send(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testSend_EmptyMessage() { + sink.send(""); + } + + @Test(expected = IllegalStateException.class) + public void testSend_NotStarted() { + sink.send(MY_MESSAGE); + } + + @Test + public void testGetTopicCommInfrastructure() { + assertEquals(CommInfrastructure.NOOP, sink.getTopicCommInfrastructure()); + } + + @Test + public void testStart_testStop_testShutdown() { + sink.start(); + assertTrue(sink.isAlive()); + + // start again + sink.start(); + assertTrue(sink.isAlive()); + + // stop + sink.stop(); + assertFalse(sink.isAlive()); + + // re-start again + sink.start(); + assertTrue(sink.isAlive()); + + // shutdown + sink.shutdown(); + assertFalse(sink.isAlive()); + } + + @Test(expected = IllegalStateException.class) + public void testStart_Locked() { + sink.lock(); + sink.start(); + } + +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicPropertyBuilder.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicPropertyBuilder.java new file mode 100644 index 00000000..4982d11d --- /dev/null +++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/TopicPropertyBuilder.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START======================================================= + * ONAP Policy Engine - Common Modules + * ================================================================================ + * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.common.endpoints.event.comm.bus; + +import java.util.Properties; + +/** + * Builder of properties used when configuring topics. + */ +public class TopicPropertyBuilder { + private final Properties properties = new Properties(); + private final String prefix; + private String topicPrefix; + + /** + * Constructs the object. + * + * @param prefix the prefix for the properties to be built + */ + public TopicPropertyBuilder(String prefix) { + this.prefix = prefix; + properties.setProperty(prefix, ""); + } + + /** + * Constructs the properties from the builder. + * + * @return a copy of the properties + */ + public Properties build() { + Properties props = new Properties(); + props.putAll(properties); + + return props; + } + + /** + * Adds a topic to the list of topics. Also sets the current topic so that subsequent + * invocations of property methods will manipulate the topic's properties. + * + * @param topic the topic to be added + * @return this builder + */ + public TopicPropertyBuilder addTopic(String topic) { + // add topic to the list of topics + String topicList = properties.getProperty(prefix); + if (!topicList.isEmpty()) { + topicList += ","; + } + topicList += topic; + properties.setProperty(prefix, topicList); + + setTopic(topic); + + return this; + } + + /** + * Sets the topic for which subsequent properties will be managed. + * + * @param topic the topic + * @return this builder + */ + public TopicPropertyBuilder setTopic(String topic) { + this.topicPrefix = prefix + "." + topic; + return this; + } + + /** + * Sets a topic's property. + * + * @param name name of the property + * @param value value to which the property should be set + * @return this builder + */ + public TopicPropertyBuilder setTopicProperty(String name, Object value) { + properties.setProperty(topicPrefix + name, value.toString()); + return this; + } + + /** + * Removes a topic's property. + * + * @param name name of the property + * @return this builder + */ + public TopicPropertyBuilder removeTopicProperty(String name) { + properties.remove(topicPrefix + name); + return this; + } +} + |