aboutsummaryrefslogtreecommitdiffstats
path: root/models-sim/models-sim-dmaap/src/test
diff options
context:
space:
mode:
authorJim Hahn <jrh3@att.com>2019-07-31 09:45:27 -0400
committerJim Hahn <jrh3@att.com>2019-10-07 12:04:25 -0400
commitaa148d9b5bba6ad23736e939a6d0ec917e761e1e (patch)
tree5c3f0d4e552b1cfa43b3ff0ed8b956bee3d31e68 /models-sim/models-sim-dmaap/src/test
parent5af913104ec412086deab4d599359751246e4ba3 (diff)
Flesh out DMaaP simulator
Fleshed out the dmaap simulator with additional media types. Added more junit tests. Added buildDmaapSim() to simulators Util. Change-Id: I39acd3df8e8d0ded21228e56fa1ef919cafc3772 Issue-ID: POLICY-2144 Signed-off-by: Jim Hahn <jrh3@att.com>
Diffstat (limited to 'models-sim/models-sim-dmaap/src/test')
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java39
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java305
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java287
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java213
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java63
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java145
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java181
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java94
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java81
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java199
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java89
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java34
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java70
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java95
-rw-r--r--models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java100
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json0
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json3
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json1
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json1
-rw-r--r--models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json36
20 files changed, 2035 insertions, 1 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java
new file mode 100644
index 000000000..4e37a5e36
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+
+public class DmaapSimXxxExceptionTest {
+
+ @Test
+ public void testDmaapSimException() {
+ assertEquals(3, new ExceptionsTester().test(DmaapSimException.class));
+ }
+
+ @Test
+ public void testDmaapSimRuntimeException() {
+ assertEquals(3, new ExceptionsTester().test(DmaapSimRuntimeException.class));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java
new file mode 100644
index 000000000..4513ffb82
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java
@@ -0,0 +1,305 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConsumerGroupDataTest {
+ private static final int WAIT_MS = 5000;
+ private static final int MIN_WAIT_MS = WAIT_MS / 2;
+ private static final String MY_TOPIC = "my-topic";
+ private static final String MY_CONSUMER = "my-consumer";
+ private static final String MSG1 = "hello";
+ private static final String MSG2 = "there";
+ private static final String MSG3 = "world";
+ private static final int MAX_THREADS = 30;
+
+ private MyData data;
+ private MyReader thread;
+ private List<MyReader> threads;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ data = new MyData();
+ thread = null;
+ threads = new ArrayList<>(MAX_THREADS);
+ }
+
+ /**
+ * Stops any running thread.
+ */
+ @After
+ public void tearDown() {
+ for (MyReader thr : threads) {
+ thr.interrupt();
+ }
+
+ for (MyReader thr : threads) {
+ thr.await();
+ }
+ }
+
+ @Test
+ public void testShouldRemove() throws InterruptedException {
+ assertFalse(data.shouldRemove());
+ assertTrue(data.shouldRemove());
+
+ data = new MyData();
+
+ // start a reader thread and wait for it to poll its queue
+ startReader(0, 10);
+ assertTrue(data.await());
+
+ assertFalse(data.shouldRemove());
+ }
+
+ @Test
+ public void testRead() {
+ data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3);
+
+ // this reader only wants one
+ startReader(1, 1);
+ assertTrue(thread.await());
+ assertEquals("[hello]", thread.result.toString());
+
+ // this reader wants three
+ startReader(3, 1);
+ assertTrue(thread.await());
+ assertEquals("[there, world, hello]", thread.result.toString());
+
+ // this reader wants three, but will only get two
+ startReader(3, 1);
+ assertTrue(thread.await());
+ assertEquals("[there, world]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_Idle() throws InterruptedException {
+ // force it to idle
+ data.shouldRemove();
+ data.shouldRemove();
+
+ long tbeg = System.currentTimeMillis();
+ assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS));
+
+ // should not have waited
+ assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS);
+ }
+
+ @Test
+ public void testRead_NegativeCount() throws InterruptedException {
+ data.enqueue(MSG1, MSG2);
+ startReader(-1, 3);
+ assertTrue(data.await());
+
+ // wait time should be unaffected
+ assertEquals(3L, data.waitMs2);
+
+ assertTrue(thread.await());
+
+ // should only return one message
+ assertEquals("[hello]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_NegativeWait() throws InterruptedException {
+ data.enqueue(MSG1, MSG2, MSG3);
+ startReader(2, -3);
+ assertTrue(data.await());
+
+ assertEquals(0L, data.waitMs2);
+
+ assertTrue(thread.await());
+
+ // should return two messages, as requested
+ assertEquals("[hello, there]", thread.result.toString());
+ }
+
+ @Test
+ public void testRead_NoMessages() throws InterruptedException {
+ startReader(0, 0);
+ assertTrue(data.await());
+
+ assertTrue(thread.await());
+ assertTrue(thread.result.isEmpty());
+ }
+
+ @Test
+ public void testRead_MultiThreaded() {
+ // queue up a bunch of messages
+ final int expected = MAX_THREADS * 3;
+ for (int x = 0; x < expected; ++x) {
+ data.enqueue(MSG1);
+ }
+
+ for (int x = 0; x < MAX_THREADS; ++x) {
+ startReader(4, 1);
+ }
+
+ int actual = 0;
+ for (MyReader thr : threads) {
+ thr.await();
+ actual += thr.result.size();
+ }
+
+ assertEquals(expected, actual);
+ }
+
+
+ /**
+ * Starts a reader thread.
+ *
+ * @param limit number of messages to read at one time
+ * @param waitMs wait time, in milliseconds
+ */
+ private void startReader(int limit, long waitMs) {
+ thread = new MyReader(limit, waitMs);
+
+ thread.setDaemon(true);
+ thread.start();
+
+ threads.add(thread);
+ }
+
+
+ private class MyData extends ConsumerGroupData {
+
+ /**
+ * Decremented when {@link #getNextMessage(long)} is invoked.
+ */
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ /**
+ * Messages to be added to the queue when {@link #getNextMessage(long)} is
+ * invoked.
+ */
+ private final List<String> messages = new ArrayList<>();
+
+ /**
+ * Value passed to {@link #getNextMessage(long)}.
+ */
+ private volatile long waitMs2 = -1;
+
+ /**
+ * Constructs the object.
+ */
+ public MyData() {
+ super(MY_TOPIC, MY_CONSUMER);
+ }
+
+ /**
+ * Arranges for messages to be injected into the queue the next time
+ * {@link #getNextMessage(long)} is invoked.
+ *
+ * @param messages the messages to be injected
+ */
+ public void enqueue(String... messages) {
+ this.messages.addAll(Arrays.asList(messages));
+ }
+
+ @Override
+ protected String getNextMessage(long waitMs) throws InterruptedException {
+ waitMs2 = waitMs;
+
+ latch.countDown();
+
+ synchronized (messages) {
+ write(messages);
+ messages.clear();
+ }
+
+ return super.getNextMessage(waitMs);
+ }
+
+ /**
+ * Waits for {@link #getNextMessage(long)} to be invoked.
+ *
+ * @return {@code true} if {@link #getNextMessage(long)} was invoked,
+ * {@code false} if the timer expired first
+ * @throws InterruptedException if the current thread is interrupted while waiting
+ */
+ public boolean await() throws InterruptedException {
+ return latch.await(WAIT_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Thread that will invoke the consumer group's read() method one time.
+ */
+ private class MyReader extends Thread {
+ private final ConsumerGroupData group = data;
+ private final int limit;
+ private final long waitMs;
+
+ /**
+ * Result returned by the read() method.
+ */
+ private List<String> result = Collections.emptyList();
+
+ public MyReader(int limit, long waitMs) {
+ this.limit = limit;
+ this.waitMs = waitMs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ result = group.read(limit, waitMs);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Waits for the thread to complete.
+ *
+ * @return {@code true} if the thread completed, {@code false} if the thread is
+ * still running
+ */
+ public boolean await() {
+ try {
+ this.join(WAIT_MS);
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ return !this.isAlive();
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java
new file mode 100644
index 000000000..f8c141614
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java
@@ -0,0 +1,287 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+public class DmaapSimProviderTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final long SWEEP_SEC = 10L;
+ private static final String TOPIC1 = "topic-A";
+ private static final String TOPIC2 = "topic-B";
+ private static final String CONSUMER1 = "consumer-X";
+ private static final String CONSUMER_ID1 = "id1";
+
+ private MyProvider prov;
+
+ @Mock
+ private DmaapSimParameterGroup params;
+
+ @Mock
+ private ScheduledExecutorService timer;
+
+ @Mock
+ private TopicData data1;
+
+ @Mock
+ private TopicData data2;
+
+ @Captor
+ private ArgumentCaptor<List<Object>> listCaptor;
+
+ /**
+ * Sets up.
+ */
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC);
+
+ prov = new MyProvider(params);
+ }
+
+ /**
+ * Shuts down the provider, if it's running.
+ */
+ @After
+ public void tearDown() {
+ if (prov.isAlive()) {
+ prov.shutdown();
+ }
+ }
+
+ /**
+ * Verifies that the constructor adds all of the expected actions to the service
+ * manager container.
+ */
+ @Test
+ public void testDmaapSimProvider() {
+ prov.start();
+ verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS));
+
+ prov.stop();
+ verify(timer).shutdown();
+ }
+
+ @Test
+ public void testProcessDmaapMessagePut_List() throws CoderException {
+ prov = spy(new MyProvider(params));
+
+ when(data1.write(any())).thenReturn(2);
+
+ // force topics to exist
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+ List<Object> lst = Arrays.asList("hello", "world");
+ Response resp = prov.processDmaapMessagePut(TOPIC1, lst);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+ StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+ assertEquals("2", sco.getString("count"));
+
+ List<Object> lst2 = Arrays.asList("helloB", "worldB");
+ prov.processDmaapMessagePut(TOPIC1, lst2);
+ prov.processDmaapMessagePut(TOPIC2, lst2);
+
+ // should only invoke this once for each topic
+ verify(prov).makeTopicData(TOPIC1);
+ verify(prov).makeTopicData(TOPIC2);
+
+ // should process all writes
+ verify(data1).write(lst);
+ verify(data1).write(lst2);
+
+ verify(data2).write(lst2);
+ }
+
+ @Test
+ public void testProcessDmaapMessagePut_Single() throws CoderException {
+ prov = spy(new MyProvider(params));
+
+ // force topics to exist
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0);
+
+ final String value1 = "abc";
+ Response resp = prov.processDmaapMessagePut(TOPIC1, value1);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+
+ // ensure that the response can be decoded
+ new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class);
+
+ final String value2 = "def";
+ prov.processDmaapMessagePut(TOPIC1, value2);
+ prov.processDmaapMessagePut(TOPIC2, value2);
+
+ // should only invoke this once for each topic
+ verify(prov).makeTopicData(TOPIC1);
+ verify(prov).makeTopicData(TOPIC2);
+
+ // should process all writes as singleton lists
+ listCaptor.getAllValues().clear();
+ verify(data1, times(2)).write(listCaptor.capture());
+ assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0));
+ assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1));
+
+ listCaptor.getAllValues().clear();
+ verify(data2).write(listCaptor.capture());
+ assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0));
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet() throws InterruptedException {
+ List<String> msgs = Arrays.asList("400", "500");
+ when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs);
+
+ Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L);
+ assertEquals(Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(msgs.toString(), resp.getEntity().toString());
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet_Timeout() throws InterruptedException {
+ when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+ Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L);
+ assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+ }
+
+ @Test
+ public void testProcessDmaapMessageGet_Ex() throws InterruptedException {
+ BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>();
+
+ // put in a background thread so it doesn't interrupt the tester thread
+ new Thread(() -> {
+ try {
+ when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION));
+ respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
+
+ Response resp = respQueue.poll(3, TimeUnit.SECONDS);
+ assertNotNull(resp);
+
+ assertEquals(Status.GONE.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+ }
+
+ @Test
+ public void testSweepTopicTaskRun() {
+ prov.start();
+ prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+ prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0);
+
+ ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
+ verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class));
+
+ captor.getValue().run();
+ verify(data1).removeIdleConsumers();
+ verify(data2).removeIdleConsumers();
+
+ // run it again
+ captor.getValue().run();
+ verify(data1, times(2)).removeIdleConsumers();
+ verify(data2, times(2)).removeIdleConsumers();
+ }
+
+ @Test
+ public void testMakeTimerPool() {
+ // use a real provider so we can test the real makeTimer() method
+ DmaapSimProvider prov2 = new DmaapSimProvider(params);
+ prov2.start();
+ prov2.stop();
+ }
+
+ @Test
+ public void testMakeTopicData() {
+ // use a real provider so we can test the real makeTopicData() method
+ DmaapSimProvider prov2 = new DmaapSimProvider(params);
+ prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0);
+ }
+
+ @Test
+ public void testGetInstance_testSetInstance() {
+ DmaapSimProvider.setInstance(prov);
+ assertSame(prov, DmaapSimProvider.getInstance());
+
+ DmaapSimProvider.setInstance(null);
+ assertNull(DmaapSimProvider.getInstance());
+ }
+
+
+ public class MyProvider extends DmaapSimProvider {
+
+ public MyProvider(DmaapSimParameterGroup params) {
+ super(params);
+ }
+
+ @Override
+ protected ScheduledExecutorService makeTimerPool() {
+ return timer;
+ }
+
+ @Override
+ protected TopicData makeTopicData(String topicName) {
+ switch (topicName) {
+ case TOPIC1:
+ return data1;
+ case TOPIC2:
+ return data2;
+ default:
+ throw new IllegalArgumentException("unknown topic name: " + topicName);
+ }
+ }
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java
new file mode 100644
index 000000000..f7e1f5e6c
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java
@@ -0,0 +1,213 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.provider;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+public class TopicDataTest {
+ private static final String EXPECTED_EXCEPTION = "expected exception";
+ private static final String GROUP1 = "group-A";
+ private static final String GROUP2 = "group-B";
+ private static final String GROUP3 = "group-C";
+
+ private TopicData data;
+ private ConsumerGroupData consgrp1;
+ private ConsumerGroupData consgrp2;
+ private ConsumerGroupData consgrp3;
+ private List<ConsumerGroupData> groups;
+
+ /**
+ * Sets up mocks and the initial data object.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ consgrp1 = mock(ConsumerGroupData.class);
+ consgrp2 = mock(ConsumerGroupData.class);
+ consgrp3 = mock(ConsumerGroupData.class);
+
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+ when(consgrp2.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+ when(consgrp3.read(anyInt(), anyLong())).thenReturn(Collections.emptyList());
+
+ groups = new LinkedList<>(Arrays.asList(consgrp1, consgrp2, consgrp3));
+
+ data = new TopicData("my-topic") {
+ @Override
+ protected ConsumerGroupData makeData(String consumerGroup) {
+ return groups.remove(0);
+ }
+ };
+ }
+
+ @Test
+ public void testRemoveIdleConsumers() throws Exception {
+ // force two consumers into the map
+ data.read(GROUP1, 0, 0);
+ data.read(GROUP2, 0, 0);
+ data.read(GROUP3, 0, 0);
+
+ // indicate that one should be removed
+ when(consgrp1.shouldRemove()).thenReturn(true);
+
+ // sweep
+ data.removeIdleConsumers();
+
+ assertEquals("[group-B, group-C]", new TreeSet<>(getGroups().keySet()).toString());
+
+ // indicate that the others should be removed
+ when(consgrp2.shouldRemove()).thenReturn(true);
+ when(consgrp3.shouldRemove()).thenReturn(true);
+
+ // sweep
+ data.removeIdleConsumers();
+
+ assertTrue(getGroups().isEmpty());
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ List<String> lst = Collections.emptyList();
+
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(ConsumerGroupData.UNREADABLE_LIST)
+ .thenReturn(ConsumerGroupData.UNREADABLE_LIST).thenReturn(lst);
+
+ assertSame(lst, data.read(GROUP1, 10, 20));
+
+ // should have invoked three times
+ verify(consgrp1, times(3)).read(anyInt(), anyLong());
+
+ // should have used the given values
+ verify(consgrp1, times(3)).read(10, 20);
+
+ // should not have allocated more than one group
+ assertEquals(2, groups.size());
+ }
+
+ @Test
+ public void testRead_MultipleGroups() throws Exception {
+ List<String> lst1 = Collections.emptyList();
+ when(consgrp1.read(anyInt(), anyLong())).thenReturn(lst1);
+
+ List<String> lst2 = Collections.emptyList();
+ when(consgrp2.read(anyInt(), anyLong())).thenReturn(lst2);
+
+ // one from each group
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // repeat
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // again
+ assertSame(lst1, data.read(GROUP1, 0, 0));
+ assertSame(lst2, data.read(GROUP2, 0, 0));
+
+ // should still have group3 in the list
+ assertEquals(1, groups.size());
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ // no groups yet
+ List<Object> messages = Arrays.asList("hello", "world");
+ data.write(messages);
+
+ // add two groups
+ data.read(GROUP1, 0, 0);
+ data.read(GROUP2, 0, 0);
+
+ data.write(messages);
+
+ // should have been written to both groups
+ List<String> strings = messages.stream().map(Object::toString).collect(Collectors.toList());
+ verify(consgrp1).write(strings);
+ verify(consgrp2).write(strings);
+ }
+
+ @Test
+ public void testConvertMessagesToStrings() {
+ assertEquals("[abc, 200]", data.convertMessagesToStrings(Arrays.asList("abc", null, 200)).toString());
+ }
+
+ @Test
+ public void testConvertMessageToString() throws CoderException {
+ Coder coder = new StandardCoder();
+
+ assertNull(data.convertMessageToString(null, coder));
+ assertEquals("text-msg", data.convertMessageToString("text-msg", coder));
+ assertEquals("100", data.convertMessageToString(100, coder));
+
+ coder = mock(Coder.class);
+ when(coder.encode(any())).thenThrow(new CoderException(EXPECTED_EXCEPTION));
+ assertNull(data.convertMessageToString(new TreeMap<String,Object>(), coder));
+ }
+
+ @Test
+ public void testMakeData() throws Exception {
+ // use real objects instead of mocks
+ TopicData data2 = new TopicData("real-data-topic");
+
+ // force a group into the topic
+ data2.read(GROUP1, 0, 0);
+
+ data2.write(Arrays.asList("abc", "def", "ghi"));
+
+ assertEquals("[abc, def]", data2.read(GROUP1, 2, 0).toString());
+ }
+
+ /**
+ * Gets the consumer group map from the topic data object.
+ *
+ * @return the topic's consumer group map
+ */
+ @SuppressWarnings("unchecked")
+ private Map<String, ConsumerGroupData> getGroups() {
+ return (Map<String, ConsumerGroupData>) Whitebox.getInternalState(data, "group2data");
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java
new file mode 100644
index 000000000..5cba78355
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BaseRestControllerV1Test {
+
+ private BaseRestControllerV1 ctlr;
+ private ResponseBuilder bldr;
+
+ @Before
+ public void setUp() {
+ ctlr = new BaseRestControllerV1();
+ bldr = Response.status(Response.Status.OK);
+ }
+
+ @Test
+ public void testAddVersionControlHeaders() {
+ Response resp = ctlr.addVersionControlHeaders(bldr).build();
+ assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_MINOR_NAME));
+ assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_PATCH_NAME));
+ assertEquals("1.0.0", resp.getHeaderString(BaseRestControllerV1.VERSION_LATEST_NAME));
+ }
+
+ @Test
+ public void testAddLoggingHeaders_Null() {
+ Response resp = ctlr.addLoggingHeaders(bldr, null).build();
+ assertNotNull(resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+ }
+
+ @Test
+ public void testAddLoggingHeaders_NonNull() {
+ UUID uuid = UUID.randomUUID();
+ Response resp = ctlr.addLoggingHeaders(bldr, uuid).build();
+ assertEquals(uuid.toString(), resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java
new file mode 100644
index 000000000..5d9186c75
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java
@@ -0,0 +1,145 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Models
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CambriaMessageBodyHandlerTest {
+ private static final String STD_INPUT = "1.3.XAbc";
+ private static final String EXPECTED_OUTPUT = "[Abc]";
+
+ private CambriaMessageBodyHandler hdlr;
+
+ @Before
+ public void setUp() {
+ hdlr = new CambriaMessageBodyHandler();
+ }
+
+ @Test
+ public void testIsReadable() {
+ assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("application/cambria")));
+
+ assertFalse(hdlr.isReadable(null, null, null, null));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("application/other")));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/cambria")));
+ }
+
+ @Test
+ public void testReadFrom() throws IOException {
+ List<Object> lst = readStream("1.11.AMessageBody", "3.3.123Foo3.3.123Bar", "0.16.You can do that..8.Or that.");
+ assertEquals("[MessageBody, Foo, Bar, You can do that., Or that.]", lst.toString());
+
+ // empty stream
+ lst = readStream();
+ assertEquals("[]", lst.toString());
+ }
+
+ @Test
+ public void testReadMessage_InvalidPartitionLength() {
+ assertThatThrownBy(() -> readStream("100000000.3.")).isInstanceOf(IOException.class)
+ .hasMessage("invalid partition length");
+ }
+
+ @Test
+ public void testReadMessage_InvalidMessageLength() {
+ assertThatThrownBy(() -> readStream("3.100000000.ABC")).isInstanceOf(IOException.class)
+ .hasMessage("invalid message length");
+ }
+
+ @Test
+ public void testSkipWhitespace() throws IOException {
+ // no white space
+ assertEquals(EXPECTED_OUTPUT, readStream(STD_INPUT).toString());
+
+ // single white space
+ assertEquals(EXPECTED_OUTPUT, readStream(" " + STD_INPUT).toString());
+
+ // multiple white spaces
+ assertEquals(EXPECTED_OUTPUT, readStream("\n\n\t" + STD_INPUT).toString());
+ }
+
+ @Test
+ public void testReadLength_NoDigits() throws IOException {
+ assertEquals("[]", readStream("..").toString());
+ }
+
+ @Test
+ public void testReadLength_NoDot() {
+ assertThatThrownBy(() -> readStream("3.2")).isInstanceOf(EOFException.class)
+ .hasMessage("missing '.' in 'length' field");
+ }
+
+ @Test
+ public void testReadLength_NonDigit() {
+ assertThatThrownBy(() -> readStream("3.2x.ABCde")).isInstanceOf(IOException.class)
+ .hasMessage("invalid character in 'length' field");
+ }
+
+ @Test
+ public void testReadLength_TooManyDigits() {
+ assertThatThrownBy(() -> readStream("3.12345678901234567890.ABCde")).isInstanceOf(IOException.class)
+ .hasMessage("too many digits in 'length' field");
+ }
+
+ @Test
+ public void testReadString_ZeroLength() throws IOException {
+ assertEquals("[]", readStream("1..X").toString());
+ }
+
+ @Test
+ public void testReadString_TooShort() {
+ assertThatThrownBy(() -> readStream(".5.me")).isInstanceOf(EOFException.class).hasMessageContaining("actual");
+ }
+
+ /**
+ * Reads a stream via the handler.
+ *
+ * @param text lines of text to be read
+ * @return the list of objects that were decoded from the stream
+ * @throws IOException if an error occurs
+ */
+ private List<Object> readStream(String... text) throws IOException {
+ return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+ }
+
+ /**
+ * Creates an input stream from lines of text.
+ *
+ * @param text lines of text
+ * @return an input stream
+ */
+ private InputStream makeStream(String... text) {
+ return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java
new file mode 100644
index 000000000..7e30c5a4c
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java
@@ -0,0 +1,181 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.charset.StandardCharsets;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import lombok.Getter;
+import org.glassfish.jersey.client.ClientProperties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.gson.GsonMessageBodyHandler;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common base class for rest server tests.
+ */
+public class CommonRestServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CommonRestServer.class);
+
+ public static final String NOT_ALIVE = "not alive";
+ public static final String ALIVE = "alive";
+ public static final String SELF = "self";
+ public static final String NAME = "DMaaP Simulator";
+ public static final String ENDPOINT_PREFIX = "events/";
+
+ @Getter
+ private static int port;
+
+ protected static String httpPrefix;
+
+ private static Main main;
+
+ /**
+ * Allocates a port for the server, writes a config file, and then starts Main.
+ *
+ * @throws Exception if an error occurs
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ port = NetworkUtil.allocPort();
+
+ httpPrefix = "http://localhost:" + port + "/";
+
+ String json = new CommonTestData().getParameterGroupAsString(port);
+ makeConfigFile("src/test/resources/parameters/TestConfigParams.json", json);
+
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+ startMain();
+ }
+
+ /**
+ * Stops Main.
+ */
+ @AfterClass
+ public static void teardownAfterClass() {
+ try {
+ if (main != null) {
+ Main main2 = main;
+ main = null;
+
+ main2.shutdown();
+ }
+
+ } catch (DmaapSimException exp) {
+ LOGGER.error("cannot stop main", exp);
+ }
+ }
+
+ /**
+ * Set up.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ // restart, if not currently running
+ if (main == null) {
+ startMain();
+ }
+ }
+
+ /**
+ * Makes a parameter configuration file.
+ * @param fileName name of the config file to be created
+ * @param json json to be written to the file
+ *
+ * @throws Exception if an error occurs
+ */
+ protected static void makeConfigFile(String fileName, String json) throws Exception {
+ File file = new File(fileName);
+ file.deleteOnExit();
+
+ try (FileOutputStream output = new FileOutputStream(file)) {
+ output.write(json.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ /**
+ * Starts the "Main".
+ *
+ * @throws Exception if an error occurs
+ */
+ private static void startMain() throws Exception {
+ Registry.newRegistry();
+
+ // make sure port is available
+ if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) {
+ throw new IllegalStateException("port " + port + " is still in use");
+ }
+
+ final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"};
+
+ main = new Main(simConfigParameters);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", port, 60, 1000L)) {
+ throw new IllegalStateException("server is not listening on port " + port);
+ }
+ }
+
+ /**
+ * Sends a request to an endpoint.
+ *
+ * @param endpoint the target endpoint
+ * @return a request builder
+ * @throws Exception if an error occurs
+ */
+ protected Invocation.Builder sendRequest(final String endpoint) throws Exception {
+ return sendFqeRequest(httpPrefix + ENDPOINT_PREFIX + endpoint);
+ }
+
+ /**
+ * Sends a request to a fully qualified endpoint.
+ *
+ * @param fullyQualifiedEndpoint the fully qualified target endpoint
+ * @return a request builder
+ */
+ protected Invocation.Builder sendFqeRequest(final String fullyQualifiedEndpoint) {
+ final Client client = ClientBuilder.newBuilder().build();
+
+ client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true");
+ client.register(GsonMessageBodyHandler.class);
+
+ final WebTarget webTarget = client.target(fullyQualifiedEndpoint);
+
+ return webTarget.request(MediaType.APPLICATION_JSON);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java
new file mode 100644
index 000000000..7b84d543d
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider;
+
+public class DmaapSimRestControllerV1Test {
+ private static final int LIMIT = 5;
+ private static final String TOPIC = "my-topic";
+ private static final String TOPIC2 = "my-topic-B";
+ private static final String MESSAGE = "my-message";
+ private static final String MESSAGE2 = "my-message-B";
+ private static final String CONSUMER = "my-consumer";
+ private static final String CONSUMER_ID = "my-id";
+
+ private static Coder coder = new StandardCoder();
+
+ private DmaapSimRestControllerV1 rest;
+
+ /**
+ * Creates the controller.
+ *
+ * @throws CoderException if the parameters cannot be loaded
+ */
+ @Before
+ public void setUp() throws CoderException {
+ DmaapSimParameterGroup params = coder.decode(new File("src/test/resources/parameters/NormalParameters.json"),
+ DmaapSimParameterGroup.class);
+ DmaapSimProvider.setInstance(new DmaapSimProvider(params));
+ rest = new DmaapSimRestControllerV1();
+ }
+
+ @Test
+ public void test() {
+ Response resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+
+ // add some messages
+ resp = rest.postDmaapMessage(TOPIC, Arrays.asList(MESSAGE, MESSAGE2));
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(2, getCount(resp));
+
+ resp = rest.postDmaapMessage(TOPIC2, Arrays.asList(MESSAGE, MESSAGE2, MESSAGE));
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals(3, getCount(resp));
+
+ // hadn't registered with topic 2 so nothing expected from there
+ resp = rest.getDmaapMessage(TOPIC2, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[]", resp.getEntity().toString());
+
+ // now read from topic 1
+ resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0);
+ assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus());
+ assertEquals("[my-message, my-message-B]", resp.getEntity().toString());
+ }
+
+ private int getCount(Response resp) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> map = (Map<String, Object>) resp.getEntity();
+
+ return (int) map.get("count");
+ }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java
new file mode 100644
index 000000000..2dfbae980
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.models.sim.dmaap.rest;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.ws.rs.core.MediaType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TextMessageBodyHandlerTest {
+ private TextMessageBodyHandler hdlr;
+
+ @Before
+ public void setUp() {
+ hdlr = new TextMessageBodyHandler();
+ }
+
+ @Test
+ public void testIsReadable() {
+ assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("text/plain")));
+
+ assertFalse(hdlr.isReadable(null, null, null, null));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("text/other")));
+ assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/plain")));
+ }
+
+ @Test
+ public void testReadFrom() throws IOException {
+ List<Object> lst = readStream("hello", "world");
+ assertEquals("[hello, world]", lst.toString());
+
+ // empty stream
+ lst = readStream();
+ assertEquals("[]", lst.toString());
+ }
+
+ /**
+ * Reads a stream via the handler.
+ *
+ * @param text lines of text to be read
+ * @return the list of objects that were decoded from the stream
+ * @throws IOException if an error occurs
+ */
+ private List<Object> readStream(String... text) throws IOException {
+ return hdlr.readFrom(null, null, null, null, null, makeStream(text));
+ }
+
+ /**
+ * Creates an input stream from lines of text.
+ *
+ * @param text lines of text
+ * @return an input stream
+ */
+ private InputStream makeStream(String... text) {
+ return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
new file mode 100644
index 000000000..8c35de64e
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java
@@ -0,0 +1,199 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.e2e;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.rest.CommonRestServer;
+
+/**
+ * This tests the simulator using dmaap endpoints to verify that it works from publisher
+ * to subscriber.
+ */
+public class EndToEndTest extends CommonRestServer {
+ private static final int MAX_WAIT_SEC = 5;
+ private static final String ORIG_TOPIC = "MY-TOPIC";
+ private static final String ORIG_TOPIC2 = "MY-TOPIC-B";
+ private static final int MAX_MSG = 200;
+
+ private static int ntests = 0;
+ private static String topicJson;
+
+ private TopicParameterGroup topicConfig;
+
+ private String topic = "MY-TOPIC";
+ private String topic2 = "MY-TOPIC-B";
+
+ /**
+ * Messages from the topic are placed here by the endpoint.
+ */
+ private BlockingQueue<String> queue;
+
+ /**
+ * Messages from topic 2 are placed here by the endpoint.
+ */
+ private BlockingQueue<String> queue2;
+
+ /**
+ * Starts the rest server.
+ *
+ * @throws Exception if an error occurs
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TopicEndpointManager.getManager().shutdown();
+
+ CommonRestServer.setUpBeforeClass();
+
+ topicJson = new String(
+ Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()),
+ StandardCharsets.UTF_8);
+ topicJson = topicJson.replace("${port}", String.valueOf(getPort()));
+ }
+
+ /**
+ * Starts the topics.
+ *
+ * @throws CoderException if the parameters cannot be decoded
+ */
+ @Before
+ @Override
+ public void setUp() throws CoderException {
+ queue = new LinkedBlockingQueue<>();
+ queue2 = new LinkedBlockingQueue<>();
+
+ /*
+ * change topic names for each test so any listeners that may still exist will not
+ * grab new messages
+ */
+
+ ++ntests;
+ topic = "my-topic-" + ntests;
+ topic2 = "my-topic-b" + ntests;
+
+ String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic);
+
+ topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class);
+
+ TopicEndpointManager.getManager().addTopics(topicConfig);
+ TopicEndpointManager.getManager().start();
+ }
+
+ @After
+ public void tearDown() {
+ TopicEndpointManager.getManager().shutdown();
+ }
+
+ @Test
+ public void testWithTopicEndpointAtEachEnd() throws InterruptedException {
+ // register listeners to add events to appropriate queue
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+ .register((infra, topic, event) -> queue.add(event));
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic2)
+ .register((infra, topic, event) -> queue2.add(event));
+
+ // publish events
+ TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic);
+ TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2);
+ for (int x = 0; x < MAX_MSG; ++x) {
+ sink.send("hello-" + x);
+ sink2.send("world-" + x);
+ }
+
+ // verify events where received
+ for (int x = 0; x < MAX_MSG; ++x) {
+ assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ public void testCambriaFormat() throws Exception {
+ test("testCambriaFormat", "application/cambria",
+ (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n")));
+ }
+
+ @Test
+ public void testJson() throws Exception {
+ test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]"));
+ }
+
+ @Test
+ public void testText() throws Exception {
+ test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println));
+ }
+
+ /**
+ * Uses a raw URL connection to ensure the server can process messages of the given
+ * media type.
+ *
+ * @param testName name of the test
+ * @param mediaType media type
+ * @param writeMessages function that writes messages to a PrintWriter
+ * @throws Exception if an error occurs
+ */
+ private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages)
+ throws Exception {
+ String msg1 = "{'abc':10.0}".replace('\'', '"');
+ String msg2 = "{'def':20.0}".replace('\'', '"');
+
+ TopicEndpointManager.getManager().getDmaapTopicSource(topic)
+ .register((infra, topic, event) -> queue.add(event));
+
+ TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0);
+ URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic());
+
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("POST");
+ conn.setRequestProperty("Content-type", mediaType);
+ conn.setDoOutput(true);
+ conn.connect();
+
+ try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) {
+ writeMessages.accept(wtr, Arrays.asList(msg1, msg2));
+ }
+
+ assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode());
+
+ assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS));
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java
new file mode 100644
index 000000000..21d9ed604
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+/**
+ * Class to hold/create all parameters for test cases.
+ */
+public class CommonTestData {
+ public static final String SIM_GROUP_NAME = "DMaapSim";
+
+ private static final Coder coder = new StandardCoder();
+
+ /**
+ * Gets the standard simulator parameters.
+ *
+ * @param port port to be inserted into the parameters
+ * @return the standard simulator parameters
+ */
+ public DmaapSimParameterGroup getParameterGroup(int port) {
+ try {
+ return coder.decode(getParameterGroupAsString(port), DmaapSimParameterGroup.class);
+
+ } catch (CoderException e) {
+ throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+ }
+ }
+
+ /**
+ * Gets the standard simulator parameters, as a String.
+ *
+ * @param port port to be inserted into the parameters
+ * @return the standard simulator parameters
+ */
+ public String getParameterGroupAsString(int port) {
+
+ try {
+ File file = new File("src/test/resources/parameters/NormalParameters.json");
+ String json = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
+
+ json = json.replace("6845", String.valueOf(port));
+
+ return json;
+
+ } catch (IOException e) {
+ throw new DmaapSimRuntimeException("cannot read simulator parameters", e);
+ }
+ }
+
+ /**
+ * Nulls out a field within a JSON string. It does it by adding a field with the same
+ * name, having a null value, and then prefixing the original field name with "Xxx",
+ * thus causing the original field and value to be ignored.
+ *
+ * @param json JSON string
+ * @param field field to be nulled out
+ * @return a new JSON string with the field nulled out
+ */
+ public String nullifyField(String json, String field) {
+ return json.replace(field + "\"", field + "\":null, \"" + field + "Xxx\"");
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java
new file mode 100644
index 000000000..828cd89b0
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+
+public class DmaapSimParameterGroupTest {
+ private static final String MY_NAME = "my-name";
+
+ @Test
+ public void testDmaapSimParameterGroup() {
+ DmaapSimParameterGroup params = new DmaapSimParameterGroup(MY_NAME);
+ assertEquals(MY_NAME, params.getName());
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java
new file mode 100644
index 000000000..8f053d219
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.parameters;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+public class DmaapSimParameterHandlerTest {
+
+ private static final String RESOURCE_DIR = "src/test/resources/parameters/";
+
+ private DmaapSimParameterHandler handler;
+
+ @Before
+ public void setUp() {
+ handler = new DmaapSimParameterHandler();
+ }
+
+ @Test
+ public void testGetParameters() throws DmaapSimException {
+ final DmaapSimCommandLineArguments args = new DmaapSimCommandLineArguments();
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "NormalParameters.json"});
+ DmaapSimParameterGroup params = handler.getParameters(args);
+ assertNotNull(params);
+ assertEquals("DMaapSim", params.getName());
+ assertEquals(300L, params.getTopicSweepSec());
+ assertEquals(6845, params.getRestServerParameters().getPort());
+
+
+ args.parse(new String[] {"-c", "FileNotFound.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageStartingWith("error reading parameters");
+
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "EmptyParameterFile.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageStartingWith("no parameters found");
+
+
+ args.parse(new String[] {"-c", RESOURCE_DIR + "Parameters_InvalidName.json"});
+ assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class)
+ .hasMessageContaining("validation error");
+ }
+
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java
new file mode 100644
index 000000000..380a72423
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup;
+import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimActivator;
+import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments;
+
+
+/**
+ * Class to perform unit test of {@link DmaapSimActivator}}.
+ */
+public class DmaapSimActivatorTest {
+
+ private DmaapSimActivator activator;
+
+ /**
+ * Initializes an activator.
+ *
+ * @throws Exception if an error occurs
+ */
+ @Before
+ public void setUp() throws Exception {
+ Registry.newRegistry();
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+
+ final String[] papConfigParameters = {"-c", "parameters/NormalParameters.json"};
+ final DmaapSimCommandLineArguments arguments = new DmaapSimCommandLineArguments(papConfigParameters);
+ final DmaapSimParameterGroup parGroup = new DmaapSimParameterHandler().getParameters(arguments);
+
+ activator = new DmaapSimActivator(parGroup);
+ }
+
+ /**
+ * Method for cleanup after each test.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void teardown() throws Exception {
+ if (activator != null && activator.isAlive()) {
+ activator.stop();
+ }
+ }
+
+ @Test
+ public void testDmaapSimActivator() {
+ assertFalse(activator.isAlive());
+ activator.start();
+ assertTrue(activator.isAlive());
+
+ // repeat - should throw an exception
+ assertThatIllegalStateException().isThrownBy(() -> activator.start());
+ assertTrue(activator.isAlive());
+ }
+
+ @Test
+ public void testTerminate() {
+ activator.start();
+ activator.stop();
+ assertFalse(activator.isAlive());
+
+ // repeat - should throw an exception
+ assertThatIllegalStateException().isThrownBy(() -> activator.stop());
+ assertFalse(activator.isAlive());
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java
new file mode 100644
index 000000000..b8e285a99
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Modifications Copyright (C) 2019 AT&T Intellectual Property.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.sim.dmaap.startstop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
+import org.onap.policy.models.sim.dmaap.DmaapSimException;
+import org.onap.policy.models.sim.dmaap.startstop.Main;
+import org.onap.policy.sim.dmaap.parameters.CommonTestData;
+
+/**
+ * Class to perform unit test of {@link Main}}.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class MainTest {
+ private Main main;
+
+ /**
+ * Set up.
+ */
+ @Before
+ public void setUp() {
+ main = null;
+ HttpServletServerFactoryInstance.getServerFactory().destroy();
+ }
+
+ /**
+ * Shuts "main" down.
+ *
+ * @throws Exception if an error occurs
+ */
+ @After
+ public void tearDown() throws Exception {
+ if (main != null) {
+ main.shutdown();
+ }
+ }
+
+ @Test
+ public void testMain() throws DmaapSimException {
+ final String[] NormalParameters = {"-c", "parameters/NormalParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters().isValid());
+ assertEquals(CommonTestData.SIM_GROUP_NAME, main.getParameters().getName());
+
+ main.shutdown();
+ }
+
+ @Test
+ public void testMain_NoArguments() {
+ final String[] NormalParameters = {};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+
+ @Test
+ public void testMain_InvalidArguments() {
+ // note: this is missing the "-c" argument, thus the ARGUMENTS are invalid
+ final String[] NormalParameters = {"parameters/NormalParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+
+ @Test
+ public void testMain_Help() {
+ final String[] NormalParameters = {"-h"};
+ Main.main(NormalParameters);
+ }
+
+ @Test
+ public void testMain_InvalidParameters() {
+ final String[] NormalParameters = {"-c", "parameters/InvalidParameters.json"};
+ main = new Main(NormalParameters);
+ assertTrue(main.getParameters() == null);
+ }
+}
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
index aeedf9d6e..a1c98a5b1 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json
@@ -1,5 +1,6 @@
-{
+{
"name":"DMaapSim",
+ "topicSweepSec": 1,
"restServerParameters":{
"host":"0.0.0.0",
"port":6845
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
index a2a036645..deec966e8 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json
@@ -1,5 +1,6 @@
{
"name": "DMaapSim",
+ "topicSweepSec": 300,
"restServerParameters": {
"host": "0.0.0.0",
"port": 6845
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
index fba033e52..51e9458b0 100644
--- a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json
@@ -1,5 +1,6 @@
{
"name":" ",
+ "topicSweepSec": 1,
"restServerParameters":{
"host":"0.0.0.0",
"port":6969,
diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json
new file mode 100644
index 000000000..77a320f6d
--- /dev/null
+++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json
@@ -0,0 +1,36 @@
+{
+ "topicSources": [
+ {
+ "topic": "MY-TOPIC",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap",
+ "fetchTimeout": 100
+ },
+ {
+ "topic": "MY-TOPIC-B",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap",
+ "fetchTimeout": 100
+ }
+ ],
+ "topicSinks": [
+ {
+ "topic": "MY-TOPIC",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ },
+ {
+ "topic": "MY-TOPIC-B",
+ "servers": [
+ "localhost:${port}"
+ ],
+ "topicCommInfrastructure": "dmaap"
+ }
+ ]
+} \ No newline at end of file