diff options
Diffstat (limited to 'models-sim/models-sim-dmaap/src/test')
20 files changed, 2035 insertions, 1 deletions
diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java new file mode 100644 index 000000000..4e37a5e36 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/DmaapSimXxxExceptionTest.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.common.utils.test.ExceptionsTester; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; + +public class DmaapSimXxxExceptionTest { + + @Test + public void testDmaapSimException() { + assertEquals(3, new ExceptionsTester().test(DmaapSimException.class)); + } + + @Test + public void testDmaapSimRuntimeException() { + assertEquals(3, new ExceptionsTester().test(DmaapSimRuntimeException.class)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java new file mode 100644 index 000000000..4513ffb82 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/ConsumerGroupDataTest.java @@ -0,0 +1,305 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class ConsumerGroupDataTest { + private static final int WAIT_MS = 5000; + private static final int MIN_WAIT_MS = WAIT_MS / 2; + private static final String MY_TOPIC = "my-topic"; + private static final String MY_CONSUMER = "my-consumer"; + private static final String MSG1 = "hello"; + private static final String MSG2 = "there"; + private static final String MSG3 = "world"; + private static final int MAX_THREADS = 30; + + private MyData data; + private MyReader thread; + private List<MyReader> threads; + + /** + * Sets up. + */ + @Before + public void setUp() { + data = new MyData(); + thread = null; + threads = new ArrayList<>(MAX_THREADS); + } + + /** + * Stops any running thread. + */ + @After + public void tearDown() { + for (MyReader thr : threads) { + thr.interrupt(); + } + + for (MyReader thr : threads) { + thr.await(); + } + } + + @Test + public void testShouldRemove() throws InterruptedException { + assertFalse(data.shouldRemove()); + assertTrue(data.shouldRemove()); + + data = new MyData(); + + // start a reader thread and wait for it to poll its queue + startReader(0, 10); + assertTrue(data.await()); + + assertFalse(data.shouldRemove()); + } + + @Test + public void testRead() { + data.enqueue(MSG1, MSG2, MSG3, MSG1, MSG2, MSG3); + + // this reader only wants one + startReader(1, 1); + assertTrue(thread.await()); + assertEquals("[hello]", thread.result.toString()); + + // this reader wants three + startReader(3, 1); + assertTrue(thread.await()); + assertEquals("[there, world, hello]", thread.result.toString()); + + // this reader wants three, but will only get two + startReader(3, 1); + assertTrue(thread.await()); + assertEquals("[there, world]", thread.result.toString()); + } + + @Test + public void testRead_Idle() throws InterruptedException { + // force it to idle + data.shouldRemove(); + data.shouldRemove(); + + long tbeg = System.currentTimeMillis(); + assertSame(ConsumerGroupData.UNREADABLE_LIST, data.read(1, WAIT_MS)); + + // should not have waited + assertTrue(System.currentTimeMillis() < tbeg + MIN_WAIT_MS); + } + + @Test + public void testRead_NegativeCount() throws InterruptedException { + data.enqueue(MSG1, MSG2); + startReader(-1, 3); + assertTrue(data.await()); + + // wait time should be unaffected + assertEquals(3L, data.waitMs2); + + assertTrue(thread.await()); + + // should only return one message + assertEquals("[hello]", thread.result.toString()); + } + + @Test + public void testRead_NegativeWait() throws InterruptedException { + data.enqueue(MSG1, MSG2, MSG3); + startReader(2, -3); + assertTrue(data.await()); + + assertEquals(0L, data.waitMs2); + + assertTrue(thread.await()); + + // should return two messages, as requested + assertEquals("[hello, there]", thread.result.toString()); + } + + @Test + public void testRead_NoMessages() throws InterruptedException { + startReader(0, 0); + assertTrue(data.await()); + + assertTrue(thread.await()); + assertTrue(thread.result.isEmpty()); + } + + @Test + public void testRead_MultiThreaded() { + // queue up a bunch of messages + final int expected = MAX_THREADS * 3; + for (int x = 0; x < expected; ++x) { + data.enqueue(MSG1); + } + + for (int x = 0; x < MAX_THREADS; ++x) { + startReader(4, 1); + } + + int actual = 0; + for (MyReader thr : threads) { + thr.await(); + actual += thr.result.size(); + } + + assertEquals(expected, actual); + } + + + /** + * Starts a reader thread. + * + * @param limit number of messages to read at one time + * @param waitMs wait time, in milliseconds + */ + private void startReader(int limit, long waitMs) { + thread = new MyReader(limit, waitMs); + + thread.setDaemon(true); + thread.start(); + + threads.add(thread); + } + + + private class MyData extends ConsumerGroupData { + + /** + * Decremented when {@link #getNextMessage(long)} is invoked. + */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** + * Messages to be added to the queue when {@link #getNextMessage(long)} is + * invoked. + */ + private final List<String> messages = new ArrayList<>(); + + /** + * Value passed to {@link #getNextMessage(long)}. + */ + private volatile long waitMs2 = -1; + + /** + * Constructs the object. + */ + public MyData() { + super(MY_TOPIC, MY_CONSUMER); + } + + /** + * Arranges for messages to be injected into the queue the next time + * {@link #getNextMessage(long)} is invoked. + * + * @param messages the messages to be injected + */ + public void enqueue(String... messages) { + this.messages.addAll(Arrays.asList(messages)); + } + + @Override + protected String getNextMessage(long waitMs) throws InterruptedException { + waitMs2 = waitMs; + + latch.countDown(); + + synchronized (messages) { + write(messages); + messages.clear(); + } + + return super.getNextMessage(waitMs); + } + + /** + * Waits for {@link #getNextMessage(long)} to be invoked. + * + * @return {@code true} if {@link #getNextMessage(long)} was invoked, + * {@code false} if the timer expired first + * @throws InterruptedException if the current thread is interrupted while waiting + */ + public boolean await() throws InterruptedException { + return latch.await(WAIT_MS, TimeUnit.MILLISECONDS); + } + } + + /** + * Thread that will invoke the consumer group's read() method one time. + */ + private class MyReader extends Thread { + private final ConsumerGroupData group = data; + private final int limit; + private final long waitMs; + + /** + * Result returned by the read() method. + */ + private List<String> result = Collections.emptyList(); + + public MyReader(int limit, long waitMs) { + this.limit = limit; + this.waitMs = waitMs; + } + + @Override + public void run() { + try { + result = group.read(limit, waitMs); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Waits for the thread to complete. + * + * @return {@code true} if the thread completed, {@code false} if the thread is + * still running + */ + public boolean await() { + try { + this.join(WAIT_MS); + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return !this.isAlive(); + } + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java new file mode 100644 index 000000000..f8c141614 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/DmaapSimProviderTest.java @@ -0,0 +1,287 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; + +public class DmaapSimProviderTest { + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final long SWEEP_SEC = 10L; + private static final String TOPIC1 = "topic-A"; + private static final String TOPIC2 = "topic-B"; + private static final String CONSUMER1 = "consumer-X"; + private static final String CONSUMER_ID1 = "id1"; + + private MyProvider prov; + + @Mock + private DmaapSimParameterGroup params; + + @Mock + private ScheduledExecutorService timer; + + @Mock + private TopicData data1; + + @Mock + private TopicData data2; + + @Captor + private ArgumentCaptor<List<Object>> listCaptor; + + /** + * Sets up. + */ + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + when(params.getTopicSweepSec()).thenReturn(SWEEP_SEC); + + prov = new MyProvider(params); + } + + /** + * Shuts down the provider, if it's running. + */ + @After + public void tearDown() { + if (prov.isAlive()) { + prov.shutdown(); + } + } + + /** + * Verifies that the constructor adds all of the expected actions to the service + * manager container. + */ + @Test + public void testDmaapSimProvider() { + prov.start(); + verify(timer).scheduleWithFixedDelay(any(), eq(SWEEP_SEC), eq(SWEEP_SEC), eq(TimeUnit.SECONDS)); + + prov.stop(); + verify(timer).shutdown(); + } + + @Test + public void testProcessDmaapMessagePut_List() throws CoderException { + prov = spy(new MyProvider(params)); + + when(data1.write(any())).thenReturn(2); + + // force topics to exist + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0); + + List<Object> lst = Arrays.asList("hello", "world"); + Response resp = prov.processDmaapMessagePut(TOPIC1, lst); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + StandardCoderObject sco = new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class); + assertEquals("2", sco.getString("count")); + + List<Object> lst2 = Arrays.asList("helloB", "worldB"); + prov.processDmaapMessagePut(TOPIC1, lst2); + prov.processDmaapMessagePut(TOPIC2, lst2); + + // should only invoke this once for each topic + verify(prov).makeTopicData(TOPIC1); + verify(prov).makeTopicData(TOPIC2); + + // should process all writes + verify(data1).write(lst); + verify(data1).write(lst2); + + verify(data2).write(lst2); + } + + @Test + public void testProcessDmaapMessagePut_Single() throws CoderException { + prov = spy(new MyProvider(params)); + + // force topics to exist + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 1, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 1, 0); + + final String value1 = "abc"; + Response resp = prov.processDmaapMessagePut(TOPIC1, value1); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + + // ensure that the response can be decoded + new StandardCoder().decode(resp.getEntity().toString(), StandardCoderObject.class); + + final String value2 = "def"; + prov.processDmaapMessagePut(TOPIC1, value2); + prov.processDmaapMessagePut(TOPIC2, value2); + + // should only invoke this once for each topic + verify(prov).makeTopicData(TOPIC1); + verify(prov).makeTopicData(TOPIC2); + + // should process all writes as singleton lists + listCaptor.getAllValues().clear(); + verify(data1, times(2)).write(listCaptor.capture()); + assertEquals(Collections.singletonList(value1), listCaptor.getAllValues().get(0)); + assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(1)); + + listCaptor.getAllValues().clear(); + verify(data2).write(listCaptor.capture()); + assertEquals(Collections.singletonList(value2), listCaptor.getAllValues().get(0)); + } + + @Test + public void testProcessDmaapMessageGet() throws InterruptedException { + List<String> msgs = Arrays.asList("400", "500"); + when(data1.read(any(), anyInt(), anyLong())).thenReturn(msgs); + + Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 4, 400L); + assertEquals(Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(msgs.toString(), resp.getEntity().toString()); + } + + @Test + public void testProcessDmaapMessageGet_Timeout() throws InterruptedException { + when(data1.read(any(), anyInt(), anyLong())).thenReturn(Collections.emptyList()); + + Response resp = prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L); + assertEquals(Status.REQUEST_TIMEOUT.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + } + + @Test + public void testProcessDmaapMessageGet_Ex() throws InterruptedException { + BlockingQueue<Response> respQueue = new LinkedBlockingQueue<>(); + + // put in a background thread so it doesn't interrupt the tester thread + new Thread(() -> { + try { + when(data1.read(any(), anyInt(), anyLong())).thenThrow(new InterruptedException(EXPECTED_EXCEPTION)); + respQueue.offer(prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 3, 300L)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + + Response resp = respQueue.poll(3, TimeUnit.SECONDS); + assertNotNull(resp); + + assertEquals(Status.GONE.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + } + + @Test + public void testSweepTopicTaskRun() { + prov.start(); + prov.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0); + prov.processDmaapMessageGet(TOPIC2, CONSUMER1, CONSUMER_ID1, 0, 0); + + ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class); + verify(timer).scheduleWithFixedDelay(captor.capture(), anyLong(), anyLong(), any(TimeUnit.class)); + + captor.getValue().run(); + verify(data1).removeIdleConsumers(); + verify(data2).removeIdleConsumers(); + + // run it again + captor.getValue().run(); + verify(data1, times(2)).removeIdleConsumers(); + verify(data2, times(2)).removeIdleConsumers(); + } + + @Test + public void testMakeTimerPool() { + // use a real provider so we can test the real makeTimer() method + DmaapSimProvider prov2 = new DmaapSimProvider(params); + prov2.start(); + prov2.stop(); + } + + @Test + public void testMakeTopicData() { + // use a real provider so we can test the real makeTopicData() method + DmaapSimProvider prov2 = new DmaapSimProvider(params); + prov2.processDmaapMessageGet(TOPIC1, CONSUMER1, CONSUMER_ID1, 0, 0); + } + + @Test + public void testGetInstance_testSetInstance() { + DmaapSimProvider.setInstance(prov); + assertSame(prov, DmaapSimProvider.getInstance()); + + DmaapSimProvider.setInstance(null); + assertNull(DmaapSimProvider.getInstance()); + } + + + public class MyProvider extends DmaapSimProvider { + + public MyProvider(DmaapSimParameterGroup params) { + super(params); + } + + @Override + protected ScheduledExecutorService makeTimerPool() { + return timer; + } + + @Override + protected TopicData makeTopicData(String topicName) { + switch (topicName) { + case TOPIC1: + return data1; + case TOPIC2: + return data2; + default: + throw new IllegalArgumentException("unknown topic name: " + topicName); + } + } + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java new file mode 100644 index 000000000..f7e1f5e6c --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/provider/TopicDataTest.java @@ -0,0 +1,213 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.provider; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; + +public class TopicDataTest { + private static final String EXPECTED_EXCEPTION = "expected exception"; + private static final String GROUP1 = "group-A"; + private static final String GROUP2 = "group-B"; + private static final String GROUP3 = "group-C"; + + private TopicData data; + private ConsumerGroupData consgrp1; + private ConsumerGroupData consgrp2; + private ConsumerGroupData consgrp3; + private List<ConsumerGroupData> groups; + + /** + * Sets up mocks and the initial data object. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + consgrp1 = mock(ConsumerGroupData.class); + consgrp2 = mock(ConsumerGroupData.class); + consgrp3 = mock(ConsumerGroupData.class); + + when(consgrp1.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + when(consgrp2.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + when(consgrp3.read(anyInt(), anyLong())).thenReturn(Collections.emptyList()); + + groups = new LinkedList<>(Arrays.asList(consgrp1, consgrp2, consgrp3)); + + data = new TopicData("my-topic") { + @Override + protected ConsumerGroupData makeData(String consumerGroup) { + return groups.remove(0); + } + }; + } + + @Test + public void testRemoveIdleConsumers() throws Exception { + // force two consumers into the map + data.read(GROUP1, 0, 0); + data.read(GROUP2, 0, 0); + data.read(GROUP3, 0, 0); + + // indicate that one should be removed + when(consgrp1.shouldRemove()).thenReturn(true); + + // sweep + data.removeIdleConsumers(); + + assertEquals("[group-B, group-C]", new TreeSet<>(getGroups().keySet()).toString()); + + // indicate that the others should be removed + when(consgrp2.shouldRemove()).thenReturn(true); + when(consgrp3.shouldRemove()).thenReturn(true); + + // sweep + data.removeIdleConsumers(); + + assertTrue(getGroups().isEmpty()); + } + + @Test + public void testRead() throws Exception { + List<String> lst = Collections.emptyList(); + + when(consgrp1.read(anyInt(), anyLong())).thenReturn(ConsumerGroupData.UNREADABLE_LIST) + .thenReturn(ConsumerGroupData.UNREADABLE_LIST).thenReturn(lst); + + assertSame(lst, data.read(GROUP1, 10, 20)); + + // should have invoked three times + verify(consgrp1, times(3)).read(anyInt(), anyLong()); + + // should have used the given values + verify(consgrp1, times(3)).read(10, 20); + + // should not have allocated more than one group + assertEquals(2, groups.size()); + } + + @Test + public void testRead_MultipleGroups() throws Exception { + List<String> lst1 = Collections.emptyList(); + when(consgrp1.read(anyInt(), anyLong())).thenReturn(lst1); + + List<String> lst2 = Collections.emptyList(); + when(consgrp2.read(anyInt(), anyLong())).thenReturn(lst2); + + // one from each group + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // repeat + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // again + assertSame(lst1, data.read(GROUP1, 0, 0)); + assertSame(lst2, data.read(GROUP2, 0, 0)); + + // should still have group3 in the list + assertEquals(1, groups.size()); + } + + @Test + public void testWrite() throws Exception { + // no groups yet + List<Object> messages = Arrays.asList("hello", "world"); + data.write(messages); + + // add two groups + data.read(GROUP1, 0, 0); + data.read(GROUP2, 0, 0); + + data.write(messages); + + // should have been written to both groups + List<String> strings = messages.stream().map(Object::toString).collect(Collectors.toList()); + verify(consgrp1).write(strings); + verify(consgrp2).write(strings); + } + + @Test + public void testConvertMessagesToStrings() { + assertEquals("[abc, 200]", data.convertMessagesToStrings(Arrays.asList("abc", null, 200)).toString()); + } + + @Test + public void testConvertMessageToString() throws CoderException { + Coder coder = new StandardCoder(); + + assertNull(data.convertMessageToString(null, coder)); + assertEquals("text-msg", data.convertMessageToString("text-msg", coder)); + assertEquals("100", data.convertMessageToString(100, coder)); + + coder = mock(Coder.class); + when(coder.encode(any())).thenThrow(new CoderException(EXPECTED_EXCEPTION)); + assertNull(data.convertMessageToString(new TreeMap<String,Object>(), coder)); + } + + @Test + public void testMakeData() throws Exception { + // use real objects instead of mocks + TopicData data2 = new TopicData("real-data-topic"); + + // force a group into the topic + data2.read(GROUP1, 0, 0); + + data2.write(Arrays.asList("abc", "def", "ghi")); + + assertEquals("[abc, def]", data2.read(GROUP1, 2, 0).toString()); + } + + /** + * Gets the consumer group map from the topic data object. + * + * @return the topic's consumer group map + */ + @SuppressWarnings("unchecked") + private Map<String, ConsumerGroupData> getGroups() { + return (Map<String, ConsumerGroupData>) Whitebox.getInternalState(data, "group2data"); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java new file mode 100644 index 000000000..5cba78355 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/BaseRestControllerV1Test.java @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.UUID; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import org.junit.Before; +import org.junit.Test; + +public class BaseRestControllerV1Test { + + private BaseRestControllerV1 ctlr; + private ResponseBuilder bldr; + + @Before + public void setUp() { + ctlr = new BaseRestControllerV1(); + bldr = Response.status(Response.Status.OK); + } + + @Test + public void testAddVersionControlHeaders() { + Response resp = ctlr.addVersionControlHeaders(bldr).build(); + assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_MINOR_NAME)); + assertEquals("0", resp.getHeaderString(BaseRestControllerV1.VERSION_PATCH_NAME)); + assertEquals("1.0.0", resp.getHeaderString(BaseRestControllerV1.VERSION_LATEST_NAME)); + } + + @Test + public void testAddLoggingHeaders_Null() { + Response resp = ctlr.addLoggingHeaders(bldr, null).build(); + assertNotNull(resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME)); + } + + @Test + public void testAddLoggingHeaders_NonNull() { + UUID uuid = UUID.randomUUID(); + Response resp = ctlr.addLoggingHeaders(bldr, uuid).build(); + assertEquals(uuid.toString(), resp.getHeaderString(BaseRestControllerV1.REQUEST_ID_NAME)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java new file mode 100644 index 000000000..5d9186c75 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CambriaMessageBodyHandlerTest.java @@ -0,0 +1,145 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Models + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import javax.ws.rs.core.MediaType; +import org.junit.Before; +import org.junit.Test; + +public class CambriaMessageBodyHandlerTest { + private static final String STD_INPUT = "1.3.XAbc"; + private static final String EXPECTED_OUTPUT = "[Abc]"; + + private CambriaMessageBodyHandler hdlr; + + @Before + public void setUp() { + hdlr = new CambriaMessageBodyHandler(); + } + + @Test + public void testIsReadable() { + assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("application/cambria"))); + + assertFalse(hdlr.isReadable(null, null, null, null)); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("application/other"))); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/cambria"))); + } + + @Test + public void testReadFrom() throws IOException { + List<Object> lst = readStream("1.11.AMessageBody", "3.3.123Foo3.3.123Bar", "0.16.You can do that..8.Or that."); + assertEquals("[MessageBody, Foo, Bar, You can do that., Or that.]", lst.toString()); + + // empty stream + lst = readStream(); + assertEquals("[]", lst.toString()); + } + + @Test + public void testReadMessage_InvalidPartitionLength() { + assertThatThrownBy(() -> readStream("100000000.3.")).isInstanceOf(IOException.class) + .hasMessage("invalid partition length"); + } + + @Test + public void testReadMessage_InvalidMessageLength() { + assertThatThrownBy(() -> readStream("3.100000000.ABC")).isInstanceOf(IOException.class) + .hasMessage("invalid message length"); + } + + @Test + public void testSkipWhitespace() throws IOException { + // no white space + assertEquals(EXPECTED_OUTPUT, readStream(STD_INPUT).toString()); + + // single white space + assertEquals(EXPECTED_OUTPUT, readStream(" " + STD_INPUT).toString()); + + // multiple white spaces + assertEquals(EXPECTED_OUTPUT, readStream("\n\n\t" + STD_INPUT).toString()); + } + + @Test + public void testReadLength_NoDigits() throws IOException { + assertEquals("[]", readStream("..").toString()); + } + + @Test + public void testReadLength_NoDot() { + assertThatThrownBy(() -> readStream("3.2")).isInstanceOf(EOFException.class) + .hasMessage("missing '.' in 'length' field"); + } + + @Test + public void testReadLength_NonDigit() { + assertThatThrownBy(() -> readStream("3.2x.ABCde")).isInstanceOf(IOException.class) + .hasMessage("invalid character in 'length' field"); + } + + @Test + public void testReadLength_TooManyDigits() { + assertThatThrownBy(() -> readStream("3.12345678901234567890.ABCde")).isInstanceOf(IOException.class) + .hasMessage("too many digits in 'length' field"); + } + + @Test + public void testReadString_ZeroLength() throws IOException { + assertEquals("[]", readStream("1..X").toString()); + } + + @Test + public void testReadString_TooShort() { + assertThatThrownBy(() -> readStream(".5.me")).isInstanceOf(EOFException.class).hasMessageContaining("actual"); + } + + /** + * Reads a stream via the handler. + * + * @param text lines of text to be read + * @return the list of objects that were decoded from the stream + * @throws IOException if an error occurs + */ + private List<Object> readStream(String... text) throws IOException { + return hdlr.readFrom(null, null, null, null, null, makeStream(text)); + } + + /** + * Creates an input stream from lines of text. + * + * @param text lines of text + * @return an input stream + */ + private InputStream makeStream(String... text) { + return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java new file mode 100644 index 000000000..7e30c5a4c --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/CommonRestServer.java @@ -0,0 +1,181 @@ +/* + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import java.io.File; +import java.io.FileOutputStream; +import java.nio.charset.StandardCharsets; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.MediaType; +import lombok.Getter; +import org.glassfish.jersey.client.ClientProperties; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.gson.GsonMessageBodyHandler; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.startstop.Main; +import org.onap.policy.sim.dmaap.parameters.CommonTestData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Common base class for rest server tests. + */ +public class CommonRestServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(CommonRestServer.class); + + public static final String NOT_ALIVE = "not alive"; + public static final String ALIVE = "alive"; + public static final String SELF = "self"; + public static final String NAME = "DMaaP Simulator"; + public static final String ENDPOINT_PREFIX = "events/"; + + @Getter + private static int port; + + protected static String httpPrefix; + + private static Main main; + + /** + * Allocates a port for the server, writes a config file, and then starts Main. + * + * @throws Exception if an error occurs + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + port = NetworkUtil.allocPort(); + + httpPrefix = "http://localhost:" + port + "/"; + + String json = new CommonTestData().getParameterGroupAsString(port); + makeConfigFile("src/test/resources/parameters/TestConfigParams.json", json); + + HttpServletServerFactoryInstance.getServerFactory().destroy(); + + startMain(); + } + + /** + * Stops Main. + */ + @AfterClass + public static void teardownAfterClass() { + try { + if (main != null) { + Main main2 = main; + main = null; + + main2.shutdown(); + } + + } catch (DmaapSimException exp) { + LOGGER.error("cannot stop main", exp); + } + } + + /** + * Set up. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + // restart, if not currently running + if (main == null) { + startMain(); + } + } + + /** + * Makes a parameter configuration file. + * @param fileName name of the config file to be created + * @param json json to be written to the file + * + * @throws Exception if an error occurs + */ + protected static void makeConfigFile(String fileName, String json) throws Exception { + File file = new File(fileName); + file.deleteOnExit(); + + try (FileOutputStream output = new FileOutputStream(file)) { + output.write(json.getBytes(StandardCharsets.UTF_8)); + } + } + + /** + * Starts the "Main". + * + * @throws Exception if an error occurs + */ + private static void startMain() throws Exception { + Registry.newRegistry(); + + // make sure port is available + if (NetworkUtil.isTcpPortOpen("localhost", port, 1, 1L)) { + throw new IllegalStateException("port " + port + " is still in use"); + } + + final String[] simConfigParameters = {"-c", "src/test/resources/parameters/TestConfigParams.json"}; + + main = new Main(simConfigParameters); + + if (!NetworkUtil.isTcpPortOpen("localhost", port, 60, 1000L)) { + throw new IllegalStateException("server is not listening on port " + port); + } + } + + /** + * Sends a request to an endpoint. + * + * @param endpoint the target endpoint + * @return a request builder + * @throws Exception if an error occurs + */ + protected Invocation.Builder sendRequest(final String endpoint) throws Exception { + return sendFqeRequest(httpPrefix + ENDPOINT_PREFIX + endpoint); + } + + /** + * Sends a request to a fully qualified endpoint. + * + * @param fullyQualifiedEndpoint the fully qualified target endpoint + * @return a request builder + */ + protected Invocation.Builder sendFqeRequest(final String fullyQualifiedEndpoint) { + final Client client = ClientBuilder.newBuilder().build(); + + client.property(ClientProperties.METAINF_SERVICES_LOOKUP_DISABLE, "true"); + client.register(GsonMessageBodyHandler.class); + + final WebTarget webTarget = client.target(fullyQualifiedEndpoint); + + return webTarget.request(MediaType.APPLICATION_JSON); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java new file mode 100644 index 000000000..7b84d543d --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/DmaapSimRestControllerV1Test.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.util.Arrays; +import java.util.Map; +import javax.ws.rs.core.Response; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.provider.DmaapSimProvider; + +public class DmaapSimRestControllerV1Test { + private static final int LIMIT = 5; + private static final String TOPIC = "my-topic"; + private static final String TOPIC2 = "my-topic-B"; + private static final String MESSAGE = "my-message"; + private static final String MESSAGE2 = "my-message-B"; + private static final String CONSUMER = "my-consumer"; + private static final String CONSUMER_ID = "my-id"; + + private static Coder coder = new StandardCoder(); + + private DmaapSimRestControllerV1 rest; + + /** + * Creates the controller. + * + * @throws CoderException if the parameters cannot be loaded + */ + @Before + public void setUp() throws CoderException { + DmaapSimParameterGroup params = coder.decode(new File("src/test/resources/parameters/NormalParameters.json"), + DmaapSimParameterGroup.class); + DmaapSimProvider.setInstance(new DmaapSimProvider(params)); + rest = new DmaapSimRestControllerV1(); + } + + @Test + public void test() { + Response resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + + // add some messages + resp = rest.postDmaapMessage(TOPIC, Arrays.asList(MESSAGE, MESSAGE2)); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(2, getCount(resp)); + + resp = rest.postDmaapMessage(TOPIC2, Arrays.asList(MESSAGE, MESSAGE2, MESSAGE)); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals(3, getCount(resp)); + + // hadn't registered with topic 2 so nothing expected from there + resp = rest.getDmaapMessage(TOPIC2, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[]", resp.getEntity().toString()); + + // now read from topic 1 + resp = rest.getDmaapMessage(TOPIC, CONSUMER, CONSUMER_ID, LIMIT, 0); + assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + assertEquals("[my-message, my-message-B]", resp.getEntity().toString()); + } + + private int getCount(Response resp) { + @SuppressWarnings("unchecked") + Map<String, Object> map = (Map<String, Object>) resp.getEntity(); + + return (int) map.get("count"); + } + +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java new file mode 100644 index 000000000..2dfbae980 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/models/sim/dmaap/rest/TextMessageBodyHandlerTest.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.models.sim.dmaap.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.List; +import javax.ws.rs.core.MediaType; +import org.junit.Before; +import org.junit.Test; + +public class TextMessageBodyHandlerTest { + private TextMessageBodyHandler hdlr; + + @Before + public void setUp() { + hdlr = new TextMessageBodyHandler(); + } + + @Test + public void testIsReadable() { + assertTrue(hdlr.isReadable(null, null, null, MediaType.valueOf("text/plain"))); + + assertFalse(hdlr.isReadable(null, null, null, null)); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("text/other"))); + assertFalse(hdlr.isReadable(null, null, null, MediaType.valueOf("other/plain"))); + } + + @Test + public void testReadFrom() throws IOException { + List<Object> lst = readStream("hello", "world"); + assertEquals("[hello, world]", lst.toString()); + + // empty stream + lst = readStream(); + assertEquals("[]", lst.toString()); + } + + /** + * Reads a stream via the handler. + * + * @param text lines of text to be read + * @return the list of objects that were decoded from the stream + * @throws IOException if an error occurs + */ + private List<Object> readStream(String... text) throws IOException { + return hdlr.readFrom(null, null, null, null, null, makeStream(text)); + } + + /** + * Creates an input stream from lines of text. + * + * @param text lines of text + * @return an input stream + */ + private InputStream makeStream(String... text) { + return new ByteArrayInputStream(String.join("\n", text).getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java new file mode 100644 index 000000000..8c35de64e --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/e2e/EndToEndTest.java @@ -0,0 +1,199 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.e2e; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.PrintWriter; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.rest.CommonRestServer; + +/** + * This tests the simulator using dmaap endpoints to verify that it works from publisher + * to subscriber. + */ +public class EndToEndTest extends CommonRestServer { + private static final int MAX_WAIT_SEC = 5; + private static final String ORIG_TOPIC = "MY-TOPIC"; + private static final String ORIG_TOPIC2 = "MY-TOPIC-B"; + private static final int MAX_MSG = 200; + + private static int ntests = 0; + private static String topicJson; + + private TopicParameterGroup topicConfig; + + private String topic = "MY-TOPIC"; + private String topic2 = "MY-TOPIC-B"; + + /** + * Messages from the topic are placed here by the endpoint. + */ + private BlockingQueue<String> queue; + + /** + * Messages from topic 2 are placed here by the endpoint. + */ + private BlockingQueue<String> queue2; + + /** + * Starts the rest server. + * + * @throws Exception if an error occurs + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TopicEndpointManager.getManager().shutdown(); + + CommonRestServer.setUpBeforeClass(); + + topicJson = new String( + Files.readAllBytes(new File("src/test/resources/parameters/TopicParameters.json").toPath()), + StandardCharsets.UTF_8); + topicJson = topicJson.replace("${port}", String.valueOf(getPort())); + } + + /** + * Starts the topics. + * + * @throws CoderException if the parameters cannot be decoded + */ + @Before + @Override + public void setUp() throws CoderException { + queue = new LinkedBlockingQueue<>(); + queue2 = new LinkedBlockingQueue<>(); + + /* + * change topic names for each test so any listeners that may still exist will not + * grab new messages + */ + + ++ntests; + topic = "my-topic-" + ntests; + topic2 = "my-topic-b" + ntests; + + String json = topicJson.replace(ORIG_TOPIC2, topic2).replace(ORIG_TOPIC, topic); + + topicConfig = new StandardCoder().decode(json, TopicParameterGroup.class); + + TopicEndpointManager.getManager().addTopics(topicConfig); + TopicEndpointManager.getManager().start(); + } + + @After + public void tearDown() { + TopicEndpointManager.getManager().shutdown(); + } + + @Test + public void testWithTopicEndpointAtEachEnd() throws InterruptedException { + // register listeners to add events to appropriate queue + TopicEndpointManager.getManager().getDmaapTopicSource(topic) + .register((infra, topic, event) -> queue.add(event)); + TopicEndpointManager.getManager().getDmaapTopicSource(topic2) + .register((infra, topic, event) -> queue2.add(event)); + + // publish events + TopicSink sink = TopicEndpointManager.getManager().getDmaapTopicSink(topic); + TopicSink sink2 = TopicEndpointManager.getManager().getDmaapTopicSink(topic2); + for (int x = 0; x < MAX_MSG; ++x) { + sink.send("hello-" + x); + sink2.send("world-" + x); + } + + // verify events where received + for (int x = 0; x < MAX_MSG; ++x) { + assertEquals("message " + x, "hello-" + x, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + assertEquals("message " + x, "world-" + x, queue2.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + } + } + + @Test + public void testCambriaFormat() throws Exception { + test("testCambriaFormat", "application/cambria", + (wtr, messages) -> messages.forEach(msg -> wtr.write("0." + msg.length() + "." + msg + "\n"))); + } + + @Test + public void testJson() throws Exception { + test("testJson", "application/json", (wtr, messages) -> wtr.write("[" + String.join(", ", messages) + "]")); + } + + @Test + public void testText() throws Exception { + test("testText", "text/plain", (wtr, messages) -> messages.forEach(wtr::println)); + } + + /** + * Uses a raw URL connection to ensure the server can process messages of the given + * media type. + * + * @param testName name of the test + * @param mediaType media type + * @param writeMessages function that writes messages to a PrintWriter + * @throws Exception if an error occurs + */ + private void test(String testName, String mediaType, BiConsumer<PrintWriter, List<String>> writeMessages) + throws Exception { + String msg1 = "{'abc':10.0}".replace('\'', '"'); + String msg2 = "{'def':20.0}".replace('\'', '"'); + + TopicEndpointManager.getManager().getDmaapTopicSource(topic) + .register((infra, topic, event) -> queue.add(event)); + + TopicParameters sinkcfg = topicConfig.getTopicSinks().get(0); + URL url = new URL(httpPrefix + "events/" + sinkcfg.getTopic()); + + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-type", mediaType); + conn.setDoOutput(true); + conn.connect(); + + try (PrintWriter wtr = new PrintWriter(conn.getOutputStream())) { + writeMessages.accept(wtr, Arrays.asList(msg1, msg2)); + } + + assertEquals(testName + " response code", HttpURLConnection.HTTP_OK, conn.getResponseCode()); + + assertEquals(testName + " message 1", msg1, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + assertEquals(testName + " message 2", msg2, queue.poll(MAX_WAIT_SEC, TimeUnit.SECONDS)); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java new file mode 100644 index 000000000..21d9ed604 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/CommonTestData.java @@ -0,0 +1,89 @@ +/*- + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.parameters; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.sim.dmaap.DmaapSimRuntimeException; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; + +/** + * Class to hold/create all parameters for test cases. + */ +public class CommonTestData { + public static final String SIM_GROUP_NAME = "DMaapSim"; + + private static final Coder coder = new StandardCoder(); + + /** + * Gets the standard simulator parameters. + * + * @param port port to be inserted into the parameters + * @return the standard simulator parameters + */ + public DmaapSimParameterGroup getParameterGroup(int port) { + try { + return coder.decode(getParameterGroupAsString(port), DmaapSimParameterGroup.class); + + } catch (CoderException e) { + throw new DmaapSimRuntimeException("cannot read simulator parameters", e); + } + } + + /** + * Gets the standard simulator parameters, as a String. + * + * @param port port to be inserted into the parameters + * @return the standard simulator parameters + */ + public String getParameterGroupAsString(int port) { + + try { + File file = new File("src/test/resources/parameters/NormalParameters.json"); + String json = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8); + + json = json.replace("6845", String.valueOf(port)); + + return json; + + } catch (IOException e) { + throw new DmaapSimRuntimeException("cannot read simulator parameters", e); + } + } + + /** + * Nulls out a field within a JSON string. It does it by adding a field with the same + * name, having a null value, and then prefixing the original field name with "Xxx", + * thus causing the original field and value to be ignored. + * + * @param json JSON string + * @param field field to be nulled out + * @return a new JSON string with the field nulled out + */ + public String nullifyField(String json, String field) { + return json.replace(field + "\"", field + "\":null, \"" + field + "Xxx\""); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java new file mode 100644 index 000000000..828cd89b0 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterGroupTest.java @@ -0,0 +1,34 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.parameters; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; + +public class DmaapSimParameterGroupTest { + private static final String MY_NAME = "my-name"; + + @Test + public void testDmaapSimParameterGroup() { + DmaapSimParameterGroup params = new DmaapSimParameterGroup(MY_NAME); + assertEquals(MY_NAME, params.getName()); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java new file mode 100644 index 000000000..8f053d219 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/parameters/DmaapSimParameterHandlerTest.java @@ -0,0 +1,70 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.parameters; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments; + +public class DmaapSimParameterHandlerTest { + + private static final String RESOURCE_DIR = "src/test/resources/parameters/"; + + private DmaapSimParameterHandler handler; + + @Before + public void setUp() { + handler = new DmaapSimParameterHandler(); + } + + @Test + public void testGetParameters() throws DmaapSimException { + final DmaapSimCommandLineArguments args = new DmaapSimCommandLineArguments(); + + args.parse(new String[] {"-c", RESOURCE_DIR + "NormalParameters.json"}); + DmaapSimParameterGroup params = handler.getParameters(args); + assertNotNull(params); + assertEquals("DMaapSim", params.getName()); + assertEquals(300L, params.getTopicSweepSec()); + assertEquals(6845, params.getRestServerParameters().getPort()); + + + args.parse(new String[] {"-c", "FileNotFound.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageStartingWith("error reading parameters"); + + + args.parse(new String[] {"-c", RESOURCE_DIR + "EmptyParameterFile.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageStartingWith("no parameters found"); + + + args.parse(new String[] {"-c", RESOURCE_DIR + "Parameters_InvalidName.json"}); + assertThatThrownBy(() -> handler.getParameters(args)).isInstanceOf(DmaapSimException.class) + .hasMessageContaining("validation error"); + } + +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java new file mode 100644 index 000000000..380a72423 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/DmaapSimActivatorTest.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.startstop; + +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterGroup; +import org.onap.policy.models.sim.dmaap.parameters.DmaapSimParameterHandler; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimActivator; +import org.onap.policy.models.sim.dmaap.startstop.DmaapSimCommandLineArguments; + + +/** + * Class to perform unit test of {@link DmaapSimActivator}}. + */ +public class DmaapSimActivatorTest { + + private DmaapSimActivator activator; + + /** + * Initializes an activator. + * + * @throws Exception if an error occurs + */ + @Before + public void setUp() throws Exception { + Registry.newRegistry(); + HttpServletServerFactoryInstance.getServerFactory().destroy(); + + final String[] papConfigParameters = {"-c", "parameters/NormalParameters.json"}; + final DmaapSimCommandLineArguments arguments = new DmaapSimCommandLineArguments(papConfigParameters); + final DmaapSimParameterGroup parGroup = new DmaapSimParameterHandler().getParameters(arguments); + + activator = new DmaapSimActivator(parGroup); + } + + /** + * Method for cleanup after each test. + * + * @throws Exception if an error occurs + */ + @After + public void teardown() throws Exception { + if (activator != null && activator.isAlive()) { + activator.stop(); + } + } + + @Test + public void testDmaapSimActivator() { + assertFalse(activator.isAlive()); + activator.start(); + assertTrue(activator.isAlive()); + + // repeat - should throw an exception + assertThatIllegalStateException().isThrownBy(() -> activator.start()); + assertTrue(activator.isAlive()); + } + + @Test + public void testTerminate() { + activator.start(); + activator.stop(); + assertFalse(activator.isAlive()); + + // repeat - should throw an exception + assertThatIllegalStateException().isThrownBy(() -> activator.stop()); + assertFalse(activator.isAlive()); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java new file mode 100644 index 000000000..b8e285a99 --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/java/org/onap/policy/sim/dmaap/startstop/MainTest.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START======================================================= + * Modifications Copyright (C) 2019 AT&T Intellectual Property. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.sim.dmaap.startstop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance; +import org.onap.policy.models.sim.dmaap.DmaapSimException; +import org.onap.policy.models.sim.dmaap.startstop.Main; +import org.onap.policy.sim.dmaap.parameters.CommonTestData; + +/** + * Class to perform unit test of {@link Main}}. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class MainTest { + private Main main; + + /** + * Set up. + */ + @Before + public void setUp() { + main = null; + HttpServletServerFactoryInstance.getServerFactory().destroy(); + } + + /** + * Shuts "main" down. + * + * @throws Exception if an error occurs + */ + @After + public void tearDown() throws Exception { + if (main != null) { + main.shutdown(); + } + } + + @Test + public void testMain() throws DmaapSimException { + final String[] NormalParameters = {"-c", "parameters/NormalParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters().isValid()); + assertEquals(CommonTestData.SIM_GROUP_NAME, main.getParameters().getName()); + + main.shutdown(); + } + + @Test + public void testMain_NoArguments() { + final String[] NormalParameters = {}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } + + @Test + public void testMain_InvalidArguments() { + // note: this is missing the "-c" argument, thus the ARGUMENTS are invalid + final String[] NormalParameters = {"parameters/NormalParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } + + @Test + public void testMain_Help() { + final String[] NormalParameters = {"-h"}; + Main.main(NormalParameters); + } + + @Test + public void testMain_InvalidParameters() { + final String[] NormalParameters = {"-c", "parameters/InvalidParameters.json"}; + main = new Main(NormalParameters); + assertTrue(main.getParameters() == null); + } +} diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/EmptyParameterFile.json diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json index aeedf9d6e..a1c98a5b1 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/MinimumParameters.json @@ -1,5 +1,6 @@ -{ +{ "name":"DMaapSim", + "topicSweepSec": 1, "restServerParameters":{ "host":"0.0.0.0", "port":6845 diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json index a2a036645..deec966e8 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/NormalParameters.json @@ -1,5 +1,6 @@ { "name": "DMaapSim", + "topicSweepSec": 300, "restServerParameters": { "host": "0.0.0.0", "port": 6845 diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json index fba033e52..51e9458b0 100644 --- a/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/Parameters_InvalidName.json @@ -1,5 +1,6 @@ { "name":" ", + "topicSweepSec": 1, "restServerParameters":{ "host":"0.0.0.0", "port":6969, diff --git a/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json new file mode 100644 index 000000000..77a320f6d --- /dev/null +++ b/models-sim/models-sim-dmaap/src/test/resources/parameters/TopicParameters.json @@ -0,0 +1,36 @@ +{ + "topicSources": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap", + "fetchTimeout": 100 + }, + { + "topic": "MY-TOPIC-B", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap", + "fetchTimeout": 100 + } + ], + "topicSinks": [ + { + "topic": "MY-TOPIC", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap" + }, + { + "topic": "MY-TOPIC-B", + "servers": [ + "localhost:${port}" + ], + "topicCommInfrastructure": "dmaap" + } + ] +}
\ No newline at end of file |