aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher')
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java212
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java59
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java189
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java53
4 files changed, 513 insertions, 0 deletions
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java
new file mode 100644
index 0000000..a67e777
--- /dev/null
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java
@@ -0,0 +1,212 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * @author Rajiv Singla . Creation Date: 10/21/2016.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DMaaPMRPublisherImplTest extends BaseAnalyticsDMaaPUnitTest {
+
+ @Mock
+ DMaaPMRPublisherQueueFactory dmaapMRPublisherQueueFactory;
+ @Mock
+ CloseableHttpClient closeableHttpClient;
+ @Mock
+ DMaaPMRPublisherQueue dmaapMRPublisherQueue;
+
+ @Before
+ public void setUp() throws Exception {
+ given(dmaapMRPublisherQueueFactory.create(Mockito.anyInt(), Mockito.anyInt()))
+ .willReturn(dmaapMRPublisherQueue);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testPublishSmallMessageList() throws Exception {
+ given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(10);
+ given(dmaapMRPublisherQueue.addBatchMessages(Mockito.<String>anyList())).willReturn(2);
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+
+ DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
+
+ assertThat(dmaapMRPublisherResponse.getResponseCode(), is(202));
+ assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(2));
+ assertThat(dmaapMRPublisherResponse.getResponseMessage(),
+ is("Accepted - Messages queued for batch publishing to MR Topic"));
+ }
+
+ @Test
+ public void testPublishBigMessageList() throws Exception {
+
+ given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0);
+ given(dmaapMRPublisherQueue.getMessageForPublishing()).willReturn(getTwoSampleMessages());
+ Mockito.when(
+ closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+
+ DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
+
+ assertThat(dmaapMRPublisherResponse.getResponseCode(), is(200));
+ assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(200));
+ assertThat(dmaapMRPublisherResponse.getResponseMessage(), is("Message successfully posted"));
+ }
+
+ @Test
+ public void testForcePublishSuccessful() throws Exception {
+ DMaaPMRPublisherConfig dmaapMRPublisherConfig = new
+ DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME)
+ .setPortNumber(PORT_NUMBER)
+ .setProtocol(HTTP_PROTOCOL)
+ .setContentType(CONTENT_TYPE)
+ .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE)
+ .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build();
+
+ HttpPost httpPost = Mockito.mock(HttpPost.class);
+ Mockito.when(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ dmaapMRPublisherConfig, dmaapMRPublisherQueueFactory, closeableHttpClient);
+ DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
+ assertThat(response.getResponseCode(), is(200));
+ }
+
+ @Test
+ public void testForcePublishFailure() throws Exception {
+ Mockito.when(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(503, "Message successfully posted"));
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
+ assertThat(response.getResponseCode(), is(503));
+ }
+
+ @Rule
+ public ExpectedException httpIOException = ExpectedException.none();
+
+ @Test
+ public void testForcePublishHttpFailure() throws Exception {
+
+ httpIOException.expect(DCAEAnalyticsRuntimeException.class);
+ httpIOException.expectCause(isA(IOException.class));
+
+ given(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class);
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
+ }
+
+ @Test
+ public void testFlushSuccessful() throws Exception {
+ Mockito.when(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
+
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
+ assertThat(response.getResponseCode(), is(200));
+ }
+
+ @Test
+ public void testFlushEmptyList() throws Exception {
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
+ assertThat(response.getResponseCode(), is(204));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
+ Mockito.when(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ dmaapMRPublisherImpl.close();
+ verify(closeableHttpClient).execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class));
+ }
+
+ @Test
+ public void testCloseUnsuccessful() throws Exception {
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
+ Mockito.when(closeableHttpClient.execute(
+ Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
+ .thenReturn(new ImmutablePair<>(400, "Message successfully posted"));
+ Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
+
+ DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
+ getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
+ dmaapMRPublisherImpl.close();
+ verify(closeableHttpClient, times(6)).execute(Mockito.any(HttpUriRequest.class),
+ Mockito.any(ResponseHandler.class));
+ }
+}
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java
new file mode 100644
index 0000000..263660b
--- /dev/null
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java
@@ -0,0 +1,59 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @author Rajiv Singla . Creation Date: 10/21/2016.
+ */
+public class DMaaPMRPublisherMockImpl implements DMaaPMRPublisher {
+
+ @Override
+ public DMaaPMRPublisherResponse publish(List<String> messages) throws DCAEAnalyticsRuntimeException {
+ return new DMaaPMRPublisherResponseImpl(102, "Mock Response", 100);
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) throws DCAEAnalyticsRuntimeException {
+ return null;
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse flush() {
+ return null;
+ }
+
+ @Override
+ public Date getPublisherCreationTime() {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java
new file mode 100644
index 0000000..e854716
--- /dev/null
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java
@@ -0,0 +1,189 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.junit.Test;
+import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ *
+ * @author Rajiv Singla . Creation Date: 11/2/2016.
+ */
+public class DMaaPMRPublisherQueueImplTest extends BaseAnalyticsDMaaPUnitTest {
+
+
+ @Test
+ public void testAddBatchMessages() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ // add two more message to batch queue
+ final int batchMessagesSizeAfterSecondInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 4", batchMessagesSizeAfterSecondInsert == 4);
+ // Now get all messages which must drain out batch queue
+ final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
+ assertTrue("There must be 4 messages to publish", messagesToPublish.size() == 4);
+ assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
+
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testAddBatchMessagesWhenQueueSizeIsFull() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(2, 20);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ // add 2 more messages should now throw IllegalStateException
+ publisherQueue.addBatchMessages(getTwoSampleMessages());
+ }
+
+ @Test
+ public void testAddRecoverableMessages() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ // add two recoverable messages
+ final int recoverableMessageSizeAfterFirstInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 2 after first insert",
+ recoverableMessageSizeAfterFirstInsert == 2);
+ // add two more recoverable messages
+ final int recoverableMessageSizeAfterSecondInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 4 after second insert",
+ recoverableMessageSizeAfterSecondInsert == 4);
+ // Now get all messages which must drain out batch queue
+ final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
+ assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
+ assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
+ assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
+ }
+
+
+ @Test(expected = IllegalStateException.class)
+ public void testAddRecoverableMessagesWhenRecoveryQueueIsFull() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 2);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ // add two recoverable messages
+ final int recoverableMessageSizeAfterFirstInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 2 after first insert",
+ recoverableMessageSizeAfterFirstInsert == 2);
+ // add two more recoverable messages which should throw IllegalStateException
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ }
+
+ @Test
+ public void testGetMessageForPublishing() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ // add two recoverable messages
+ final int recoverableMessageSizeAfterFirstInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 2 after first insert",
+ recoverableMessageSizeAfterFirstInsert == 2);
+ // add two more recoverable messages
+ final int recoverableMessageSizeAfterSecondInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 4 after second insert",
+ recoverableMessageSizeAfterSecondInsert == 4);
+ // Now get all messages which must drain out batch queue
+ final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
+ assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6);
+ // add two more batch and recovery messages
+ final int batchQueueSize = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ final int recoveryQueueSize = publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ final int messagePublishCount = publisherQueue.getMessageForPublishing().size();
+ assertTrue("Batch Queue + Recovery Queue message total must batch publish message count",
+ messagePublishCount == (batchQueueSize + recoveryQueueSize));
+ assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10);
+ assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20);
+
+ }
+
+ @Test
+ public void testGetBatchQueueRemainingSize() throws Exception {
+
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+ assertTrue("Batch remaining capacity should be reduced by 2",
+ publisherQueue.getBatchQueueRemainingSize() == 8);
+
+ // add two recoverable messages
+ final int recoverableMessageSizeAfterFirstInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 2 after first insert",
+ recoverableMessageSizeAfterFirstInsert == 2);
+
+ // recoverable message should not change batch queue capacity
+ assertTrue("Adding recoverable Message must not have any impact on batch queue remaining capacity ",
+ publisherQueue.getBatchQueueRemainingSize() == 8);
+ // Now get all messages which must drain out batch queue
+ final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
+ assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
+
+ // Batch queue remaining capacity should now match original batch size
+ assertTrue("Batch Queue remaining capacity must match original batch queue size", publisherQueue
+ .getBatchQueueRemainingSize() == 10);
+ }
+
+ @Test
+ public void testGetRecoveryQueueRemainingSize() throws Exception {
+ DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20);
+
+ // add two recoverable messages
+ final int recoverableMessageSizeAfterFirstInsert =
+ publisherQueue.addRecoverableMessages(getTwoSampleMessages());
+ assertTrue("Recovery Message Queue size must be 2 after first insert",
+ recoverableMessageSizeAfterFirstInsert == 2);
+ assertTrue("Recovery Queue remaining capacity should be reduced by 2",
+ publisherQueue.getRecoveryQueueRemainingSize() == 18);
+
+ // add two messages to batch queue
+ final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages());
+ assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2);
+
+ // batch message should not change recoverable queue capacity
+ assertTrue("Adding batch queue Message must not have any impact on recovery queue remaining capacity ",
+ publisherQueue.getRecoveryQueueRemainingSize() == 18);
+
+ // Now get all messages which must drain out recovery queue
+ final List<String> messagesToPublish = publisherQueue.getMessageForPublishing();
+ assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4);
+
+ // Recoverable queue remaining capacity should now match original recovery queue size
+ assertTrue("Recoverable Queue remaining capacity must match original batch queue size", publisherQueue
+ .getRecoveryQueueRemainingSize() == 20);
+ }
+
+}
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java
new file mode 100644
index 0000000..ac7023b
--- /dev/null
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java
@@ -0,0 +1,53 @@
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import java.util.List;
+
+/**
+ * @author Manjesh Gowda. Creation Date: 11/7/2016.
+ */
+public class DMaaPMRPublisherQueueMockImpl implements DMaaPMRPublisherQueue {
+ @Override
+ public int addBatchMessages(List<String> batchMessages) throws IllegalStateException {
+ return 100;
+ }
+
+ @Override
+ public int addRecoverableMessages(List<String> recoverableMessages) throws IllegalStateException {
+ return 0;
+ }
+
+ @Override
+ public List<String> getMessageForPublishing() {
+ return null;
+ }
+
+ @Override
+ public int getBatchQueueRemainingSize() {
+ return 0;
+ }
+
+ @Override
+ public int getRecoveryQueueRemainingSize() {
+ return 0;
+ }
+}