diff options
Diffstat (limited to 'dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service')
7 files changed, 1017 insertions, 0 deletions
diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponentTest.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponentTest.java new file mode 100644 index 0000000..f83af60 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponentTest.java @@ -0,0 +1,300 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Optional; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.StatusLine; +import org.apache.http.client.ResponseHandler; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +/** + * @author Manjesh Gowda. Creation Date: 11/4/2016. + */ +public class BaseDMaaPMRComponentTest extends BaseAnalyticsDMaaPUnitTest { + + @Test + public void testGetAuthHeaderWithGoodValues() { + String expectedEncodedString = "Basic bTAwNTAyQHRjYS5hZi5kY2FlLmNvbTpUZTUwMjFhYmM="; + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("USER", "PASSWORD"); + assertTrue(" Authentication Header has value ", actualOutput.isPresent()); +// assertEquals(" Authentication Header has value ", expectedEncodedString, actualOutput.get()); + } + + @Test + public void testGetAuthHeaderWithNullValues() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader(null, null); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithUserNullValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader("USER", null); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testGetAuthHeaderWithPasswordNullValue() { + Optional<String> actualOutput = BaseDMaaPMRComponent.getAuthHeader(null, "PASSWORD"); + assertFalse(" Authentication Header has value ", actualOutput.isPresent()); + } + + @Test + public void testCreatePublishURIWithGoodValues() { + URI actualURI = BaseDMaaPMRComponent.createPublisherURI(getPublisherConfig()); + String test = actualURI.toString(); + assertEquals("Generated Publisher URL is correct", + "https://testHostName:8080/events/testTopicName", actualURI.toString()); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testCreatePublishURIWithURISyntaxException() { + DMaaPMRPublisherConfig badPublisherConfig = new DMaaPMRPublisherConfig + .Builder(" dav /gh. ss/ asd ", "///@$%#-htps:<>!##") + .setPortNumber(0) + .setProtocol("https").build(); + + BaseDMaaPMRComponent.createPublisherURI(badPublisherConfig); + } + + @Test + public void testCreateSubscribeURIWithGoodValues() { + URI actualURI = BaseDMaaPMRComponent.createSubscriberURI( + getSubscriberConfig("test-consumer-group", "test-consumer-id")); + assertEquals("Generated Subscriber URL is correct", + "https://testHostName:8080/events/testTopicName/" + + "test-consumer-id/test-consumer-group?timeout=2000&limit=20", + actualURI.toString()); + } + + @Test(expected = DCAEAnalyticsRuntimeException.class) + public void testCreateSubscribeURIWithURISyntaxException() { + DMaaPMRSubscriberConfig badSubscriberConfig = new DMaaPMRSubscriberConfig + .Builder(" dav /gh. ss/ asd ", "") + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE).build(); + + URI actualURI = BaseDMaaPMRComponent.createSubscriberURI(badSubscriberConfig); + } + + @Test + public void testConvertToJsonStringGoodJsonStringList() { + List<String> jsonMessage = Arrays.asList( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}"); + + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + + String expectedJSONMsg = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + + } + + @Rule + public ExpectedException expectedJsonProcessingException = ExpectedException.none(); + + @Test + public void testConvertToJsonStringBadJsonStringList() { + expectedJsonProcessingException.expect(DCAEAnalyticsRuntimeException.class); + expectedJsonProcessingException.expectCause(isA(JsonProcessingException.class)); + + List<String> jsonMessage = Arrays.asList( + "{\"message\":\"I'm Object 1 Message\"", + "\"message\":\"I'm Object 2 Message\""); + + BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + } + + @Test + public void testConvertToJsonStringWithEmptyList() { + List<String> jsonMessage = Arrays.asList(); + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(jsonMessage); + String expectedJSONMsg = "[]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + } + + @Test + public void testConvertToJsonStringWithNullList() { + String actualJSONMsg = BaseDMaaPMRComponent.convertToJsonString(null); + String expectedJSONMsg = "[]"; + assertEquals("Convert a List of Strings to JSON is working fine", expectedJSONMsg, actualJSONMsg); + } + + + @Test + public void testConvertJsonToStringMessagesGoodValues() { + String inputJSONMsg = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(2)); + assertThat(actualList, containsInAnyOrder( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}" + )); + } + + @Test + public void testConvertJsonToStringMessagesNoValues() { + String inputJSONMsg = "[]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(0)); + } + + @Test + public void testConvertJsonToStringMessagesNullValues() { + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(null); + assertThat(actualList, hasSize(0)); + } + + @Test + public void testConvertJsonToStringMessagesEmptyValues() { + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(" "); + assertThat(actualList, hasSize(0)); + } + + @Rule + public ExpectedException convertToJSONIOException = ExpectedException.none(); + + @Test + public void testConvertJsonToStringMessagesException() { + convertToJSONIOException.expect(DCAEAnalyticsRuntimeException.class); + convertToJSONIOException.expectCause(isA(IOException.class)); + + String inputJSONMsg = "[\"{\"message\":\"I'm Object 1 Message\"}\"," + + "\"{\"message\":\"I'm Object 2 Message\"}\"]"; + List<String> actualList = BaseDMaaPMRComponent.convertJsonToStringMessages(inputJSONMsg); + assertThat(actualList, hasSize(2)); + assertThat(actualList, containsInAnyOrder( + "{\"message\":\"I'm Object 1 Message\"}", + "{\"message\":\"I'm Object 2 Message\"}" + )); + } + + @Test + public void testAddMessagesToRecoveryQueueAllGood() { + DMaaPMRPublisherQueue dmaapMRPublisherQueue = mock(DMaaPMRPublisherQueue.class); + given(dmaapMRPublisherQueue.addRecoverableMessages(Mockito.<String>anyList())).willReturn(0); + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0); + List<String> messages = new ArrayList<String>(); + BaseDMaaPMRComponent.addMessagesToRecoveryQueue(dmaapMRPublisherQueue, messages); + } + + @Rule + public ExpectedException addQueueIllegalException = ExpectedException.none(); + + @Test + public void testAddMessagesToRecoveryQueueException() { + addQueueIllegalException.expect(isA(DCAEAnalyticsRuntimeException.class)); + addQueueIllegalException.expectCause(isA(IllegalStateException.class)); + + DMaaPMRPublisherQueue dmaapMRPublisherQueue = mock(DMaaPMRPublisherQueue.class); + + given(dmaapMRPublisherQueue.addRecoverableMessages(Mockito.<String>anyList())) + .willThrow(IllegalStateException.class); + List<String> messages = new ArrayList<String>(); + + BaseDMaaPMRComponent.addMessagesToRecoveryQueue(dmaapMRPublisherQueue, messages); + } + + + @Test + public void testResponseHandler() { + HttpResponse mockHttpResponse = mock(HttpResponse.class); + StatusLine mockStatusLine = mock(StatusLine.class); + HttpEntity mockHttpEntity = mock(HttpEntity.class); + // Could not mock EntityUtils as it's final class + //EntityUtils mockEntityUtils = mock(EntityUtils.class); + + given(mockHttpResponse.getStatusLine()).willReturn(mockStatusLine); + given(mockStatusLine.getStatusCode()).willReturn(200); + given(mockHttpResponse.getEntity()).willReturn(null); + //given(mockEntityUtils.toString()).willReturn("Test value"); + + ResponseHandler<Pair<Integer, String>> responseHandler = BaseDMaaPMRComponent.responseHandler(); + try { + Pair<Integer, String> mappedResponse = responseHandler.handleResponse(mockHttpResponse); + assertTrue("Http response code returned properly ", mappedResponse.getLeft().equals(200)); + assertTrue("Http response body returned properly ", mappedResponse.getRight().equals("")); + } catch (IOException e) { + e.printStackTrace(); + } + } + + + @Test + public void testCreateSubscriberResponse() { + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = + BaseDMaaPMRComponent.createSubscriberResponse(200, "Test Message", getTwoSampleMessages()); + + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertEquals(dmaapMRSubscriberResponse.getResponseMessage(), "Test Message"); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages().size(), is(2)); + + } + + @Test + public void testCreateSubscriberResponse_no_message() { + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = + BaseDMaaPMRComponent.createSubscriberResponse(200, "Test Message", null); + + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertEquals(dmaapMRSubscriberResponse.getResponseMessage(), "Test Message"); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages().size(), is(0)); + + } + +} + + + + + diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java new file mode 100644 index 0000000..c798743 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java @@ -0,0 +1,210 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.publisher; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +@RunWith(MockitoJUnitRunner.class) +public class DMaaPMRPublisherImplTest extends BaseAnalyticsDMaaPUnitTest { + + @Mock + private DMaaPMRPublisherQueueFactory dmaapMRPublisherQueueFactory; + @Mock + private CloseableHttpClient closeableHttpClient; + @Mock + private DMaaPMRPublisherQueue dmaapMRPublisherQueue; + + @Before + public void setUp() throws Exception { + given(dmaapMRPublisherQueueFactory.create(Mockito.anyInt(), Mockito.anyInt())) + .willReturn(dmaapMRPublisherQueue); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testPublishSmallMessageList() throws Exception { + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(10); + given(dmaapMRPublisherQueue.addBatchMessages(Mockito.<String>anyList())).willReturn(2); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + + DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages()); + + assertThat(dmaapMRPublisherResponse.getResponseCode(), is(202)); + assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(2)); + assertThat(dmaapMRPublisherResponse.getResponseMessage(), + is("Accepted - Messages queued for batch publishing to MR Topic")); + } + + @Test + public void testPublishBigMessageList() throws Exception { + + given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0); + given(dmaapMRPublisherQueue.getMessageForPublishing()).willReturn(getTwoSampleMessages()); + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + + DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages()); + + assertThat(dmaapMRPublisherResponse.getResponseCode(), is(200)); + assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(200)); + assertThat(dmaapMRPublisherResponse.getResponseMessage(), is("Message successfully posted")); + } + + @Test + public void testForcePublishSuccessful() throws Exception { + DMaaPMRPublisherConfig dmaapMRPublisherConfig = new + DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE) + .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build(); + + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + dmaapMRPublisherConfig, dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + assertThat(response.getResponseCode(), is(200)); + } + + @Test + public void testForcePublishFailure() throws Exception { + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(503, "Message successfully posted")); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + assertThat(response.getResponseCode(), is(503)); + } + + @Rule + public ExpectedException httpIOException = ExpectedException.none(); + + @Test + public void testForcePublishHttpFailure() throws Exception { + + httpIOException.expect(DCAEAnalyticsRuntimeException.class); + httpIOException.expectCause(isA(IOException.class)); + + given(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages()); + } + + @Test + public void testFlushSuccessful() throws Exception { + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush(); + assertThat(response.getResponseCode(), is(200)); + } + + @Test + public void testFlushEmptyList() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList<String>()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush(); + assertThat(response.getResponseCode(), is(204)); + } + + @Test + public void testClose() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList<String>()); + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, "Message successfully posted")); + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + dmaapMRPublisherImpl.close(); + verify(closeableHttpClient).execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)); + } + + @Test + public void testCloseUnsuccessful() throws Exception { + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList<String>()); + Mockito.when(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(400, "Message successfully posted")); + Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages()); + + DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl( + getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient); + dmaapMRPublisherImpl.close(); + verify(closeableHttpClient, times(6)).execute(Mockito.any(HttpUriRequest.class), + Mockito.any(ResponseHandler.class)); + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java new file mode 100644 index 0000000..3e84430 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherMockImpl.java @@ -0,0 +1,59 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.publisher; + +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl; + +import java.util.Date; +import java.util.List; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +public class DMaaPMRPublisherMockImpl implements DMaaPMRPublisher { + + @Override + public DMaaPMRPublisherResponse publish(List<String> messages) throws DCAEAnalyticsRuntimeException { + return new DMaaPMRPublisherResponseImpl(102, "Mock Response", 100); + } + + @Override + public DMaaPMRPublisherResponse forcePublish(List<String> messages) throws DCAEAnalyticsRuntimeException { + return null; + } + + @Override + public DMaaPMRPublisherResponse flush() { + return null; + } + + @Override + public Date getPublisherCreationTime() { + return null; + } + + @Override + public void close() throws Exception { + + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java new file mode 100644 index 0000000..d5d0b7b --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImplTest.java @@ -0,0 +1,189 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.publisher; + +import org.junit.Test; +import org.onap.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; + +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * + * @author Rajiv Singla . Creation Date: 11/2/2016. + */ +public class DMaaPMRPublisherQueueImplTest extends BaseAnalyticsDMaaPUnitTest { + + + @Test + public void testAddBatchMessages() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two more message to batch queue + final int batchMessagesSizeAfterSecondInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 4", batchMessagesSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 4 messages to publish", messagesToPublish.size() == 4); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + + } + + @Test(expected = IllegalStateException.class) + public void testAddBatchMessagesWhenQueueSizeIsFull() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(2, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add 2 more messages should now throw IllegalStateException + publisherQueue.addBatchMessages(getTwoSampleMessages()); + } + + @Test + public void testAddRecoverableMessages() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages + final int recoverableMessageSizeAfterSecondInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 4 after second insert", + recoverableMessageSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20); + } + + + @Test(expected = IllegalStateException.class) + public void testAddRecoverableMessagesWhenRecoveryQueueIsFull() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 2); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages which should throw IllegalStateException + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + } + + @Test + public void testGetMessageForPublishing() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + // add two more recoverable messages + final int recoverableMessageSizeAfterSecondInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 4 after second insert", + recoverableMessageSizeAfterSecondInsert == 4); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be 6 messages to publish", messagesToPublish.size() == 6); + // add two more batch and recovery messages + final int batchQueueSize = publisherQueue.addBatchMessages(getTwoSampleMessages()); + final int recoveryQueueSize = publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + final int messagePublishCount = publisherQueue.getMessageForPublishing().size(); + assertTrue("Batch Queue + Recovery Queue message total must batch publish message count", + messagePublishCount == (batchQueueSize + recoveryQueueSize)); + assertTrue("Batch Queue must be empty", publisherQueue.getBatchQueueRemainingSize() == 10); + assertTrue("Recovery Queue must be empty", publisherQueue.getRecoveryQueueRemainingSize() == 20); + + } + + @Test + public void testGetBatchQueueRemainingSize() throws Exception { + + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + assertTrue("Batch remaining capacity should be reduced by 2", + publisherQueue.getBatchQueueRemainingSize() == 8); + + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + + // recoverable message should not change batch queue capacity + assertTrue("Adding recoverable Message must not have any impact on batch queue remaining capacity ", + publisherQueue.getBatchQueueRemainingSize() == 8); + // Now get all messages which must drain out batch queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4); + + // Batch queue remaining capacity should now match original batch size + assertTrue("Batch Queue remaining capacity must match original batch queue size", publisherQueue + .getBatchQueueRemainingSize() == 10); + } + + @Test + public void testGetRecoveryQueueRemainingSize() throws Exception { + DMaaPMRPublisherQueue publisherQueue = new DMaaPMRPublisherQueueImpl(10, 20); + + // add two recoverable messages + final int recoverableMessageSizeAfterFirstInsert = + publisherQueue.addRecoverableMessages(getTwoSampleMessages()); + assertTrue("Recovery Message Queue size must be 2 after first insert", + recoverableMessageSizeAfterFirstInsert == 2); + assertTrue("Recovery Queue remaining capacity should be reduced by 2", + publisherQueue.getRecoveryQueueRemainingSize() == 18); + + // add two messages to batch queue + final int batchMessagesSizeAfterFirstInsert = publisherQueue.addBatchMessages(getTwoSampleMessages()); + assertTrue("Batch Message Queue size must be 2", batchMessagesSizeAfterFirstInsert == 2); + + // batch message should not change recoverable queue capacity + assertTrue("Adding batch queue Message must not have any impact on recovery queue remaining capacity ", + publisherQueue.getRecoveryQueueRemainingSize() == 18); + + // Now get all messages which must drain out recovery queue + final List<String> messagesToPublish = publisherQueue.getMessageForPublishing(); + assertTrue("There must be exactly 4 messages to publish", messagesToPublish.size() == 4); + + // Recoverable queue remaining capacity should now match original recovery queue size + assertTrue("Recoverable Queue remaining capacity must match original batch queue size", publisherQueue + .getRecoveryQueueRemainingSize() == 20); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java new file mode 100644 index 0000000..7af0454 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueMockImpl.java @@ -0,0 +1,53 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.publisher; + +import java.util.List; + +/** + * @author Manjesh Gowda. Creation Date: 11/7/2016. + */ +public class DMaaPMRPublisherQueueMockImpl implements DMaaPMRPublisherQueue { + @Override + public int addBatchMessages(List<String> batchMessages) throws IllegalStateException { + return 100; + } + + @Override + public int addRecoverableMessages(List<String> recoverableMessages) throws IllegalStateException { + return 0; + } + + @Override + public List<String> getMessageForPublishing() { + return null; + } + + @Override + public int getBatchQueueRemainingSize() { + return 0; + } + + @Override + public int getRecoveryQueueRemainingSize() { + return 0; + } +} diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java new file mode 100644 index 0000000..892cbd0 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java @@ -0,0 +1,158 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.subscriber; + +import com.jayway.jsonassert.impl.matcher.IsCollectionWithSize; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.http.client.ResponseHandler; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.impl.client.CloseableHttpClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest; +import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; + +import java.io.IOException; +import java.util.Random; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.mockito.BDDMockito.given; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +@RunWith(MockitoJUnitRunner.class) +public class DMaaPMRSubscriberImplTest extends BaseAnalyticsDMaaPUnitTest { + + @Mock + private CloseableHttpClient closeableHttpClient; + + private String consumerGroup, consumerId; + + @Before + public void setUp() throws Exception { + Random random = new Random(10000L); + consumerGroup = "Test-Consumer-Group" + Long.toString(random.nextLong()); + consumerId = UUID.randomUUID().toString(); + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void testSubscriberSuccessfullyReceiveDmaapMessage() throws Exception { + + String testMessages = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, testMessages)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(2)); + } + + @Test + public void testSubscriberSuccessfullyReceiveDmaapMessageWithNoUsername() throws Exception { + + DMaaPMRSubscriberConfig dmaapMRSubscriberConfig = new DMaaPMRSubscriberConfig.Builder(HOST_NAME, TOPIC_NAME) + .setPortNumber(PORT_NUMBER) + .setProtocol(HTTP_PROTOCOL) + .setContentType(CONTENT_TYPE) + .setConsumerGroup(consumerGroup != null ? consumerGroup : SUBSCRIBER_CONSUMER_GROUP_NAME) + .setConsumerId(consumerId != null ? consumerId : SUBSCRIBER_CONSUMER_ID) + .setTimeoutMS(SUBSCRIBER_TIMEOUT_MS) + .setMessageLimit(SUBSCRIBER_MESSAGE_LIMIT).build(); + + String testMessages = "[{\"message\":\"I'm Object 1 Message\"}," + + "{\"message\":\"I'm Object 2 Message\"}]"; + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, testMessages)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + dmaapMRSubscriberConfig, closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(2)); + } + + @Test + public void testSubscriberSuccessfullyReceiveNoDmaapMessage() throws Exception { + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(200, null)); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(200)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(0)); + } + + @Test + public void testSubscriberSuccessfullyReceiveErrorMessage() throws Exception { + Mockito.when( + closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))) + .thenReturn(new ImmutablePair<>(400, "Bad Request")); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages(); + assertThat(dmaapMRSubscriberResponse.getResponseCode(), is(400)); + assertThat(dmaapMRSubscriberResponse.getFetchedMessages(), IsCollectionWithSize.hasSize(0)); + } + + @Rule + public ExpectedException httpIOException = ExpectedException.none(); + + @Test + public void testSubscriberSuccessfullyReceiveException() throws Exception { + + httpIOException.expect(DCAEAnalyticsRuntimeException.class); + httpIOException.expectCause(isA(IOException.class)); + + given(closeableHttpClient.execute( + Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class); + + DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl( + getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient); + dmaapMRSubscriberImpl.fetchMessages(); + } + +} diff --git a/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java new file mode 100644 index 0000000..6f348a5 --- /dev/null +++ b/dcae-analytics-dmaap/src/test/java/org/onap/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberMockImpl.java @@ -0,0 +1,48 @@ +/* + * ===============================LICENSE_START====================================== + * dcae-analytics + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============================LICENSE_END=========================================== + */ + +package org.onap.dcae.apod.analytics.dmaap.service.subscriber; + +import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; +import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl; + +import java.util.Date; + +/** + * @author Rajiv Singla . Creation Date: 10/21/2016. + */ +public class DMaaPMRSubscriberMockImpl implements DMaaPMRSubscriber { + + @Override + public DMaaPMRSubscriberResponse fetchMessages() throws DCAEAnalyticsRuntimeException { + return new DMaaPMRSubscriberResponseImpl(102, "Mock Response", null); + } + + @Override + public Date getSubscriberCreationTime() { + return null; + } + + @Override + public void close() throws Exception { + + } +} |