aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java7
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java14
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java11
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java121
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java236
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java217
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java124
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java148
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java295
9 files changed, 1164 insertions, 9 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
index 690a6d0b..746798fa 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
@@ -322,7 +322,7 @@ public interface BusConsumer {
logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
response.getResponseMessage());
- if (response.getResponseCode() == null || !"200".equals(response.getResponseCode())) {
+ if (!"200".equals(response.getResponseCode())) {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
response.getResponseMessage());
@@ -443,8 +443,9 @@ public interface BusConsumer {
super(busTopicParams);
- final String dme2RouteOffer = busTopicParams.getAdditionalProps()
- .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+ final String dme2RouteOffer = (busTopicParams.isAdditionalPropsValid()
+ ? busTopicParams.getAdditionalProps().get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY)
+ : null);
if (busTopicParams.isEnvironmentInvalid()) {
throw parmException(busTopicParams.getTopic(),
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
index 716ce95b..3365b4ec 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
@@ -204,6 +204,8 @@ public interface BusPublisher {
this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+ } else {
+ throw new IllegalArgumentException("Invalid DMaaP protocol " + protocol);
}
this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
@@ -361,12 +363,14 @@ public interface BusPublisher {
props.setProperty("TransportType", "DME2");
props.setProperty("MethodType", "POST");
- for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
+ if (busTopicParams.isAdditionalPropsValid()) {
+ for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
- if (value != null) {
- props.setProperty(key, value);
+ if (value != null) {
+ props.setProperty(key, value);
+ }
}
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index b9463e81..b588d1f3 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -168,7 +168,7 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
try {
this.init();
this.alive = true;
- this.busPollerThread = new Thread(this);
+ this.busPollerThread = makePollerThread();
this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
busPollerThread.start();
} catch (Exception e) {
@@ -181,6 +181,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
return this.alive;
}
+ /**
+ * Makes a new thread to be used for polling.
+ *
+ * @return a new Thread
+ */
+ protected Thread makePollerThread() {
+ return new Thread(this);
+ }
+
@Override
public boolean stop() {
logger.info("{}: stopping", this);
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java
new file mode 100644
index 00000000..ba52f191
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicTestBase.java
@@ -0,0 +1,121 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+
+/**
+ * Base class for BusTopicXxxTest classes.
+ */
+public class BusTopicTestBase {
+
+ public static final String MY_AFT_ENV = "my-aft-env";
+ public static final String MY_API_KEY = "my-api-key";
+ public static final String MY_API_SECRET = "my-api-secret";
+ public static final String MY_BASE_PATH = "my-base";
+ public static final String MY_CLIENT_NAME = "my-client";
+ public static final String MY_CONS_GROUP = "my-cons-group";
+ public static final String MY_CONS_INST = "my-cons-inst";
+ public static final String MY_ENV = "my-env";
+ public static final int MY_FETCH_LIMIT = 100;
+ public static final int MY_FETCH_TIMEOUT = 101;
+ public static final String MY_HOST = "my-host";
+ public static final String MY_LAT = "my-lat";
+ public static final String MY_LONG = "my-long";
+ public static final String MY_PARTNER = "my-partner";
+ public static final String MY_PASSWD = "my-pass";
+ public static final int MY_PORT = 102;
+ public static final String MY_TOPIC = "my-topic";
+ public static final String MY_USERNAME = "my-user";
+
+ public static final String MY_MESSAGE = "my-message";
+ public static final String MY_PARTITION = "my-partition";
+ public static final String MY_MESSAGE2 = "my-message-2";
+ public static final String MY_PARTITION2 = "my-partition-2";
+
+ public static final String ROUTE_PROP = "routeOffer";
+ public static final String MY_ROUTE = "my-route";
+
+ /**
+ * Message used within exceptions that are expected.
+ */
+ public static final String EXPECTED = "expected exception";
+
+ /**
+ * Additional properties to be added to the parameter builder.
+ */
+ protected Map<String, String> addProps;
+
+ /**
+ * Servers to be added to the parameter builder.
+ */
+ protected List<String> servers;
+
+ /**
+ * Parameter builder used to build topic parameters.
+ */
+ protected TopicParamsBuilder builder;
+
+ /**
+ * Initializes {@link #addProps}, {@link #servers}, and {@link #builder}.
+ */
+ public void setUp() {
+ addProps = new TreeMap<>();
+ addProps.put("my-key-A", "my-value-A");
+ addProps.put("my-key-B", "my-value-B");
+
+ servers = Arrays.asList("svra", "svrb");
+
+ builder = makeBuilder();
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeBuilder() {
+ return makeBuilder(addProps, servers);
+ }
+
+ /**
+ * Makes a fully populated parameter builder.
+ *
+ * @param addProps additional properties to be added to the builder
+ * @param servers servers to be added to the builder
+ * @return a new parameter builder
+ */
+ public TopicParamsBuilder makeBuilder(Map<String, String> addProps, List<String> servers) {
+
+ return BusTopicParams.builder().additionalProps(addProps).aftEnvironment(MY_AFT_ENV).allowSelfSignedCerts(true)
+ .apiKey(MY_API_KEY).apiSecret(MY_API_SECRET).basePath(MY_BASE_PATH).clientName(MY_CLIENT_NAME)
+ .consumerGroup(MY_CONS_GROUP).consumerInstance(MY_CONS_INST).environment(MY_ENV)
+ .fetchLimit(MY_FETCH_LIMIT).fetchTimeout(MY_FETCH_TIMEOUT).hostname(MY_HOST).latitude(MY_LAT)
+ .longitude(MY_LONG).managed(true).partitionId(MY_PARTITION).partner(MY_PARTNER)
+ .password(MY_PASSWD).port(MY_PORT).servers(servers).topic(MY_TOPIC).useHttps(true)
+ .userName(MY_USERNAME);
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
new file mode 100644
index 00000000..ef4d5a00
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumerTest.java
@@ -0,0 +1,236 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.att.aft.dme2.internal.apache.commons.collections.IteratorUtils;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
+import org.powermock.reflect.Whitebox;
+
+public class BusConsumerTest extends BusTopicTestBase {
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Test
+ public void testCambriaConsumerWrapper() {
+ // verify that different wrappers can be built
+ new CambriaConsumerWrapper(makeBuilder().build());
+ new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
+ new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
+ new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
+ new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
+ new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
+ new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
+ new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
+ new CambriaConsumerWrapper(makeBuilder().userName(null).build());
+ new CambriaConsumerWrapper(makeBuilder().password(null).build());
+ new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build());
+ }
+
+ @Test
+ public void testCambriaConsumerWrapperFetch() throws Exception {
+ CambriaConsumer inner = mock(CambriaConsumer.class);
+ List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
+ when(inner.fetch()).thenReturn(lst);
+
+ CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+ Whitebox.setInternalState(cons, "consumer", inner);
+
+ assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
+
+ // arrange to throw exception next time fetch is called
+ IOException ex = new IOException(EXPECTED);
+ when(inner.fetch()).thenThrow(ex);
+
+ cons.fetchTimeout = 10;
+
+ try {
+ cons.fetch();
+ fail("missing exception");
+
+ } catch (IOException | InterruptedException e) {
+ assertEquals(ex, e);
+ }
+ }
+
+ @Test
+ public void testCambriaConsumerWrapperClose() throws Exception {
+ CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+
+ // set filter several times to cause different branches of close() to be executed
+ for (int count = 0; count < 3; ++count) {
+ cons.close();
+ cons.setFilter("close=" + count);
+ }
+ }
+
+ @Test
+ public void testCambriaConsumerWrapperSetFilter() {
+ // set filter several times to cause different branches to be executed
+ CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
+ for (int count = 0; count < 3; ++count) {
+ cons.setFilter("set-filter=" + count);
+ }
+ }
+
+ @Test
+ public void testCambriaConsumerWrapperToString() {
+ assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
+ }
+
+ @Test
+ public void testDmaapConsumerWrapper() throws Exception {
+ // verify that different wrappers can be built
+ new DmaapAafConsumerWrapper(makeBuilder().build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
+ new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
+ }
+
+ @Test
+ public void testDmaapConsumerWrapperFetch() throws Exception {
+ DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
+ MRConsumerImpl cons = mock(MRConsumerImpl.class);
+
+ dmaap.fetchTimeout = 5;
+ dmaap.consumer = cons;
+
+ // null return
+ when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
+ assertFalse(dmaap.fetch().iterator().hasNext());
+
+ // with messages, 200
+ List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
+ MRConsumerResponse resp = new MRConsumerResponse();
+ resp.setResponseCode("200");
+ resp.setActualMessages(lst);
+ when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+ assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+
+ // null messages
+ resp.setActualMessages(null);
+ when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+ assertFalse(dmaap.fetch().iterator().hasNext());
+
+ // with messages, NOT 200
+ resp.setResponseCode("400");
+ resp.setActualMessages(lst);
+ when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
+
+ assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
+ }
+
+ @Test
+ public void testDmaapConsumerWrapperClose() throws Exception {
+ new DmaapAafConsumerWrapper(makeBuilder().build()).close();
+ }
+
+ @Test
+ public void testDmaapConsumerWrapperToString() throws Exception {
+ assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
+ }
+
+ @Test
+ public void testDmaapAafConsumerWrapper() throws Exception {
+ // verify that different wrappers can be built
+ new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
+ new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
+ /*
+ * Unfortunately, the MR code intercepts this and throws an exception before the
+ * wrapper gets a chance to check it, thus this test does not improve the coverage
+ * for the constructor.
+ */
+ new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
+ }
+
+ @Test
+ public void testDmaapAafConsumerWrapperToString() throws Exception {
+ assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
+ }
+
+ @Test
+ public void testDmaapDmeConsumerWrapper() throws Exception {
+ // verify that different wrappers can be built
+ new DmaapDmeConsumerWrapper(makeBuilder().build());
+ new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
+ new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
+ new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
+
+ addProps.put(ROUTE_PROP, MY_ROUTE);
+ new DmaapDmeConsumerWrapper(makeBuilder().build());
+ new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
+ new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
+ new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
+ new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
+ new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
+ new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
new file mode 100644
index 00000000..4e78b676
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisherTest.java
@@ -0,0 +1,217 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.client.response.MRPublisherResponse;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.CambriaPublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapAafPublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapDmePublisherWrapper;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher.DmaapPublisherWrapper;
+
+public class BusPublisherTest extends BusTopicTestBase {
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Test
+ public void testCambriaPublisherWrapper() {
+ // verify that different wrappers can be built
+ new CambriaPublisherWrapper(makeBuilder().build());
+ new CambriaPublisherWrapper(makeBuilder().useHttps(false).build());
+ new CambriaPublisherWrapper(makeBuilder().useHttps(true).build());
+ new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
+ new CambriaPublisherWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
+ new CambriaPublisherWrapper(makeBuilder().apiKey(null).build());
+ new CambriaPublisherWrapper(makeBuilder().apiSecret(null).build());
+ new CambriaPublisherWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
+ new CambriaPublisherWrapper(makeBuilder().userName(null).build());
+ new CambriaPublisherWrapper(makeBuilder().password(null).build());
+ new CambriaPublisherWrapper(makeBuilder().userName(null).password(null).build());
+ }
+
+ @Test
+ public void testCambriaPublisherWrapperSend() throws Exception {
+ CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
+ CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+ cambria.publisher = pub;
+
+ assertTrue(cambria.send(MY_PARTITION, MY_MESSAGE));
+
+ // publisher exception
+ when(pub.send(anyString(), anyString())).thenThrow(new IOException(EXPECTED));
+ assertFalse(cambria.send(MY_PARTITION2, MY_MESSAGE2));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCambriaPublisherWrapperSend_InvalidMsg() {
+ CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+ cambria.publisher = mock(CambriaBatchingPublisher.class);
+
+ cambria.send(MY_PARTITION, null);
+ }
+
+ @Test
+ public void testCambriaPublisherWrapperClose() throws Exception {
+ CambriaBatchingPublisher pub = mock(CambriaBatchingPublisher.class);
+ CambriaPublisherWrapper cambria = new CambriaPublisherWrapper(makeBuilder().build());
+ cambria.publisher = pub;
+
+ cambria.close();
+ verify(pub).close();
+
+ // try again, this time with an exception
+ doThrow(new RuntimeException(EXPECTED)).when(pub).close();
+ cambria.close();
+ }
+
+ @Test
+ public void testDmaapPublisherWrapper() {
+ // verify with different constructor arguments
+ new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, false);
+ new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true) {};
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapPublisherWrapper_InvalidTopic() {
+ new DmaapPublisherWrapper(ProtocolTypeConstants.DME2, servers, "", MY_USERNAME, MY_PASSWD, true) {};
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapPublisherWrapper_Aaf_NullServers() {
+ new DmaapAafPublisherWrapper(null, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapPublisherWrapper_Aaf_NoServers() {
+ new DmaapAafPublisherWrapper(Collections.emptyList(), MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapPublisherWrapper_InvalidProtocol() {
+ new DmaapPublisherWrapper(ProtocolTypeConstants.HTTPNOAUTH, servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true) {};
+ }
+
+ @Test
+ public void testDmaapPublisherWrapperClose() throws Exception {
+ MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+ DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ dmaap.publisher = pub;
+
+ dmaap.close();
+ verify(pub).close(anyLong(), any(TimeUnit.class));
+
+ // close, but with exception from publisher
+ doThrow(new IOException(EXPECTED)).when(pub).close(anyLong(), any(TimeUnit.class));
+ dmaap.close();
+ }
+
+ @Test
+ public void testDmaapPublisherWrapperSend() throws Exception {
+ MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+ DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ dmaap.publisher = pub;
+
+ // null response
+ assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
+ verify(pub).setPubResponse(any(MRPublisherResponse.class));
+ verify(pub).send(MY_PARTITION, MY_MESSAGE);
+
+ // with response
+ pub = mock(MRSimplerBatchPublisher.class);
+ dmaap.publisher = pub;
+
+ MRPublisherResponse resp = new MRPublisherResponse();
+ when(pub.sendBatchWithResponse()).thenReturn(resp);
+ assertTrue(dmaap.send(MY_PARTITION, MY_MESSAGE));
+ verify(pub).setPubResponse(any(MRPublisherResponse.class));
+ verify(pub).send(MY_PARTITION, MY_MESSAGE);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapPublisherWrapperSend_NullMessage() throws Exception {
+ MRSimplerBatchPublisher pub = mock(MRSimplerBatchPublisher.class);
+ DmaapPublisherWrapper dmaap = new DmaapAafPublisherWrapper(servers, MY_TOPIC, MY_USERNAME, MY_PASSWD, true);
+ dmaap.publisher = pub;
+
+ dmaap.send(MY_PARTITION, null);
+ }
+
+ @Test
+ public void testDmaapDmePublisherWrapper() {
+ // verify with different parameters
+ new DmaapDmePublisherWrapper(makeBuilder().build());
+ new DmaapDmePublisherWrapper(makeBuilder().additionalProps(null).build());
+
+ addProps.put(ROUTE_PROP, MY_ROUTE);
+ new DmaapDmePublisherWrapper(makeBuilder().build());
+ new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
+
+ addProps.put("null-value", null);
+ new DmaapDmePublisherWrapper(makeBuilder().build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmePublisherWrapper_InvalidEnv() {
+ new DmaapDmePublisherWrapper(makeBuilder().environment(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmePublisherWrapper_InvalidAft() {
+ new DmaapDmePublisherWrapper(makeBuilder().aftEnvironment(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmePublisherWrapper_InvalidLat() {
+ new DmaapDmePublisherWrapper(makeBuilder().latitude(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmePublisherWrapper_InvalidLong() {
+ new DmaapDmePublisherWrapper(makeBuilder().longitude(null).build());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDmaapDmePublisherWrapper_InvalidPartner() {
+ new DmaapDmePublisherWrapper(makeBuilder().partner(null).build());
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
new file mode 100644
index 00000000..01e2e61e
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBaseTest.java
@@ -0,0 +1,124 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+
+public class BusTopicBaseTest extends BusTopicTestBase {
+
+ private BusTopicBaseImpl base;
+
+ /**
+ * Initializes the object to be tested.
+ */
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ base = new BusTopicBaseImpl(builder.build());
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(base.toString());
+ }
+
+ @Test
+ public void testGetApiKey() {
+ assertEquals(MY_API_KEY, base.getApiKey());
+ }
+
+ @Test
+ public void testGetApiSecret() {
+ assertEquals(MY_API_SECRET, base.getApiSecret());
+ }
+
+ @Test
+ public void testIsUseHttps() {
+ assertEquals(true, base.isUseHttps());
+ assertEquals(false, new BusTopicBaseImpl(builder.useHttps(false).build()).isUseHttps());
+ }
+
+ @Test
+ public void testIsAllowSelfSignedCerts() {
+ assertEquals(true, base.isAllowSelfSignedCerts());
+ assertEquals(false, new BusTopicBaseImpl(builder.allowSelfSignedCerts(false).build()).isAllowSelfSignedCerts());
+ }
+
+ @Test
+ public void testAnyNullOrEmpty() {
+ assertFalse(base.anyNullOrEmpty());
+ assertFalse(base.anyNullOrEmpty("any-none-null", "any-none-null-B"));
+
+ assertTrue(base.anyNullOrEmpty(null, "any-first-null"));
+ assertTrue(base.anyNullOrEmpty("any-middle-null", null, "any-middle-null-B"));
+ assertTrue(base.anyNullOrEmpty("any-last-null", null));
+ assertTrue(base.anyNullOrEmpty("any-empty", ""));
+ }
+
+ @Test
+ public void testAllNullOrEmpty() {
+ assertTrue(base.allNullOrEmpty());
+ assertTrue(base.allNullOrEmpty(""));
+ assertTrue(base.allNullOrEmpty(null, ""));
+
+ assertFalse(base.allNullOrEmpty("all-ok-only-one"));
+ assertFalse(base.allNullOrEmpty("all-ok-one", "all-ok-two"));
+ assertFalse(base.allNullOrEmpty("all-ok-null", null));
+ assertFalse(base.allNullOrEmpty("", "all-ok-empty"));
+ assertFalse(base.allNullOrEmpty("", "all-one-ok", null));
+ }
+
+ private static class BusTopicBaseImpl extends BusTopicBase {
+
+ public BusTopicBaseImpl(BusTopicParams busTopicParams) {
+ super(busTopicParams);
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ return CommInfrastructure.NOOP;
+ }
+
+ @Override
+ public boolean start() {
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ // do nothing
+ }
+
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
new file mode 100644
index 00000000..d56374fc
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParamsTest.java
@@ -0,0 +1,148 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.function.BiConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams.TopicParamsBuilder;
+
+public class BusTopicParamsTest extends BusTopicTestBase {
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Test
+ public void test() {
+ BusTopicParams params = makeBuilder().build();
+
+ assertEquals(addProps, params.getAdditionalProps());
+ assertEquals(MY_AFT_ENV, params.getAftEnvironment());
+ assertEquals(true, params.isAllowSelfSignedCerts());
+ assertEquals(MY_API_KEY, params.getApiKey());
+ assertEquals(MY_API_SECRET, params.getApiSecret());
+ assertEquals(MY_BASE_PATH, params.getBasePath());
+ assertEquals(MY_CLIENT_NAME, params.getClientName());
+ assertEquals(MY_CONS_GROUP, params.getConsumerGroup());
+ assertEquals(MY_CONS_INST, params.getConsumerInstance());
+ assertEquals(MY_ENV, params.getEnvironment());
+ assertEquals(MY_FETCH_LIMIT, params.getFetchLimit());
+ assertEquals(MY_FETCH_TIMEOUT, params.getFetchTimeout());
+ assertEquals(MY_HOST, params.getHostname());
+ assertEquals(MY_LAT, params.getLatitude());
+ assertEquals(MY_LONG, params.getLongitude());
+ assertEquals(true, params.isManaged());
+ assertEquals(MY_PARTITION, params.getPartitionId());
+ assertEquals(MY_PARTNER, params.getPartner());
+ assertEquals(MY_PASSWD, params.getPassword());
+ assertEquals(MY_PORT, params.getPort());
+ assertEquals(servers, params.getServers());
+ assertEquals(MY_TOPIC, params.getTopic());
+ assertEquals(true, params.isUseHttps());
+ assertEquals(MY_USERNAME, params.getUserName());
+
+ // ensure that booleans are independent of each other
+ testBoolean("true:false:false", (bldr, flag) -> bldr.allowSelfSignedCerts(flag));
+ testBoolean("false:true:false", (bldr, flag) -> bldr.managed(flag));
+ testBoolean("false:false:true", (bldr, flag) -> bldr.useHttps(flag));
+
+ // test validity methods
+ assertTrue(params.isAdditionalPropsValid());
+ assertFalse(params.isAftEnvironmentInvalid());
+ assertTrue(params.isApiKeyValid());
+ assertTrue(params.isApiSecretValid());
+ assertFalse(params.isClientNameInvalid());
+ assertFalse(params.isConsumerGroupInvalid());
+ assertFalse(params.isConsumerInstanceInvalid());
+ assertFalse(params.isEnvironmentInvalid());
+ assertFalse(params.isHostnameInvalid());
+ assertFalse(params.isLatitudeInvalid());
+ assertFalse(params.isLongitudeInvalid());
+ assertFalse(params.isPartitionIdInvalid());
+ assertFalse(params.isPartnerInvalid());
+ assertTrue(params.isPasswordValid());
+ assertFalse(params.isPortInvalid());
+ assertFalse(params.isServersInvalid());
+ assertFalse(params.isTopicInvalid());
+ assertTrue(params.isUserNameValid());
+
+ // test inverted validity
+ assertFalse(makeBuilder().additionalProps(null).build().isAdditionalPropsValid());
+ assertTrue(makeBuilder().aftEnvironment("").build().isAftEnvironmentInvalid());
+ assertFalse(makeBuilder().apiKey("").build().isApiKeyValid());
+ assertFalse(makeBuilder().apiSecret("").build().isApiSecretValid());
+ assertTrue(makeBuilder().clientName("").build().isClientNameInvalid());
+ assertTrue(makeBuilder().consumerGroup("").build().isConsumerGroupInvalid());
+ assertTrue(makeBuilder().consumerInstance("").build().isConsumerInstanceInvalid());
+ assertTrue(makeBuilder().environment("").build().isEnvironmentInvalid());
+ assertTrue(makeBuilder().hostname("").build().isHostnameInvalid());
+ assertTrue(makeBuilder().latitude("").build().isLatitudeInvalid());
+ assertTrue(makeBuilder().longitude("").build().isLongitudeInvalid());
+ assertTrue(makeBuilder().partitionId("").build().isPartitionIdInvalid());
+ assertTrue(makeBuilder().partner("").build().isPartnerInvalid());
+ assertFalse(makeBuilder().password("").build().isPasswordValid());
+ assertTrue(makeBuilder().port(-1).build().isPortInvalid());
+ assertTrue(makeBuilder().port(65536).build().isPortInvalid());
+ assertTrue(makeBuilder().servers(null).build().isServersInvalid());
+ assertTrue(makeBuilder().servers(new LinkedList<>()).build().isServersInvalid());
+ assertTrue(makeBuilder().servers(Arrays.asList("")).build().isServersInvalid());
+ assertFalse(makeBuilder().servers(Arrays.asList("one-server")).build().isServersInvalid());
+ assertTrue(makeBuilder().topic("").build().isTopicInvalid());
+ assertFalse(makeBuilder().userName("").build().isUserNameValid());
+ }
+
+ /**
+ * Tests the boolean methods by applying a function, once with {@code false} and once
+ * with {@code true}. Verifies that all of the boolean methods return the correct
+ * value by concatenating them.
+ *
+ * @param expectedTrue the string that is expected when {@code true} is passed to the
+ * method
+ * @param function function to be applied to the builder
+ */
+ private void testBoolean(String expectedTrue, BiConsumer<TopicParamsBuilder, Boolean> function) {
+ TopicParamsBuilder builder = BusTopicParams.builder();
+
+ // first try the "false" case
+ function.accept(builder, false);
+
+ BusTopicParams params = builder.build();
+ assertEquals("false:false:false",
+ "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+
+
+ // now try the "true" case
+ function.accept(builder, true);
+
+ params = builder.build();
+ assertEquals(expectedTrue,
+ "" + params.isAllowSelfSignedCerts() + ":" + params.isManaged() + ":" + params.isUseHttps());
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
new file mode 100644
index 00000000..4634d125
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBaseTest.java
@@ -0,0 +1,295 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine - Common Modules
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.bus.BusTopicTestBase;
+
+public class TopicBaseTest extends BusTopicTestBase {
+
+ private TopicBaseImpl base;
+
+ /**
+ * Creates the object to be tested.
+ */
+ @Before
+ public void setUp() {
+ super.setUp();
+
+ base = new TopicBaseImpl(servers, MY_TOPIC);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicBase_NullServers() {
+ new TopicBaseImpl(null, MY_TOPIC);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicBase_EmptyServers() {
+ new TopicBaseImpl(Collections.emptyList(), MY_TOPIC);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicBase_NullTopic() {
+ new TopicBaseImpl(servers, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicBase_EmptyTopic() {
+ new TopicBaseImpl(servers, "");
+ }
+
+ @Test
+ public void testRegister() {
+ TopicListener listener = mock(TopicListener.class);
+ base.register(listener);
+ assertEquals(Arrays.asList(listener), base.snapshotTopicListeners());
+
+ // re-register - list should be unchanged
+ base.register(listener);
+ assertEquals(Arrays.asList(listener), base.snapshotTopicListeners());
+
+ // register a new listener
+ TopicListener listener2 = mock(TopicListener.class);
+ base.register(listener2);
+ assertEquals(Arrays.asList(listener, listener2), base.snapshotTopicListeners());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRegister_NullListener() {
+ base.register(null);
+ }
+
+ @Test
+ public void testUnregister() {
+ // register two listeners
+ TopicListener listener = mock(TopicListener.class);
+ TopicListener listener2 = mock(TopicListener.class);
+ base.register(listener);
+ base.register(listener2);
+
+ // unregister one
+ base.unregister(listener);
+ assertEquals(Arrays.asList(listener2), base.snapshotTopicListeners());
+
+ // unregister the other
+ base.unregister(listener2);
+ assertTrue(base.snapshotTopicListeners().isEmpty());
+
+ // unregister again
+ base.unregister(listener2);
+ assertTrue(base.snapshotTopicListeners().isEmpty());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnregister_NullListener() {
+ base.register(mock(TopicListener.class));
+ base.unregister(null);
+ }
+
+ @Test
+ public void testBroadcast() {
+ // register two listeners
+ TopicListener listener = mock(TopicListener.class);
+ TopicListener listener2 = mock(TopicListener.class);
+ base.register(listener);
+ base.register(listener2);
+
+ // broadcast a message
+ final String msg1 = "message-A";
+ base.broadcast(msg1);
+ verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg1);
+ verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg1);
+
+ // broadcast another message, with an exception
+ final String msg2 = "message-B";
+ doThrow(new RuntimeException(EXPECTED)).when(listener).onTopicEvent(any(), any(), any());
+ base.broadcast(msg2);
+ verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg2);
+ verify(listener2).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, msg2);
+ }
+
+ @Test
+ public void testLock_testUnlock() {
+ assertFalse(base.isLocked());
+ assertTrue(base.lock());
+ assertEquals(0, base.startCount);
+ assertEquals(1, base.stopCount);
+
+ // lock again - should not stop again
+ assertTrue(base.isLocked());
+ assertTrue(base.lock());
+ assertEquals(0, base.startCount);
+ assertEquals(1, base.stopCount);
+
+ assertTrue(base.isLocked());
+ assertTrue(base.unlock());
+ assertEquals(1, base.startCount);
+ assertEquals(1, base.stopCount);
+
+ // unlock again - should not start again
+ assertFalse(base.isLocked());
+ assertTrue(base.unlock());
+ assertEquals(1, base.startCount);
+ assertEquals(1, base.stopCount);
+
+ // lock, but stop returns false
+ base = new TopicBaseImpl(servers, MY_TOPIC);
+ base.stopReturn = false;
+ assertFalse(base.lock());
+ assertTrue(base.isLocked());
+ assertTrue(base.lock());
+
+ // unlock, but start returns false
+ base.startReturn = false;
+ assertFalse(base.unlock());
+ assertFalse(base.isLocked());
+ assertTrue(base.unlock());
+
+ // lock & re-lock, but start throws an exception
+ base = new TopicBaseImpl(servers, MY_TOPIC);
+ base.startEx = true;
+ assertTrue(base.lock());
+ assertFalse(base.unlock());
+ assertFalse(base.isLocked());
+ assertTrue(base.unlock());
+ }
+
+ @Test
+ public void testIsLocked() {
+ assertFalse(base.isLocked());
+ base.lock();
+ assertTrue(base.isLocked());
+ base.unlock();
+ assertFalse(base.isLocked());
+ }
+
+ @Test
+ public void testGetTopic() {
+ assertEquals(MY_TOPIC, base.getTopic());
+ }
+
+ @Test
+ public void testIsAlive() {
+ assertFalse(base.isAlive());
+ base.start();
+ assertTrue(base.isAlive());
+ base.stop();
+ assertFalse(base.isAlive());
+ }
+
+ @Test
+ public void testGetServers() {
+ assertEquals(servers, base.getServers());
+ }
+
+ @Test
+ public void testGetRecentEvents() {
+ assertEquals(0, base.getRecentEvents().length);
+
+ base.addEvent("recent-A");
+ base.addEvent("recent-B");
+
+ String[] recent = base.getRecentEvents();
+ assertEquals(2, recent.length);
+ assertEquals("recent-A", recent[0]);
+ assertEquals("recent-B", recent[1]);
+ }
+
+ @Test
+ public void testToString() {
+ assertNotNull(base.toString());
+ }
+
+ /**
+ * Implementation of TopicBase.
+ */
+ private static class TopicBaseImpl extends TopicBase {
+ private int startCount = 0;
+ private int stopCount = 0;
+ private boolean startReturn = true;
+ private boolean stopReturn = true;
+ private boolean startEx = false;
+
+ /**
+ * Constructor.
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ */
+ public TopicBaseImpl(List<String> servers, String topic) {
+ super(servers, topic);
+ }
+
+ @Override
+ public CommInfrastructure getTopicCommInfrastructure() {
+ return CommInfrastructure.NOOP;
+ }
+
+ @Override
+ public boolean start() {
+ ++startCount;
+
+ if (startEx) {
+ throw new RuntimeException(EXPECTED);
+ }
+
+ alive = true;
+ return startReturn;
+ }
+
+ @Override
+ public boolean stop() {
+ ++stopCount;
+ alive = false;
+ return stopReturn;
+ }
+
+ @Override
+ public void shutdown() {
+ // do nothing
+ }
+
+ /**
+ * Adds an event to the list of recent events.
+ *
+ * @param event event to be added
+ */
+ public void addEvent(String event) {
+ recentEvents.add(event);
+ }
+ }
+}