aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/test/java
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2020-10-14 14:14:06 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2020-10-19 08:34:58 +0200
commitf86407dfdf0e04979a6765da4eb13f9983e1150e (patch)
treed5a2fb87cfbbd062bf0e3d613cc8a0ee04e7b18d /a1-policy-management/src/test/java
parentdf373ec4c902a2596dd2dfe957425af1e3113b17 (diff)
Made DmaapMessageConsumer asynchronuous
Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e Issue-ID: CCSDK-2502 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management/src/test/java')
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java190
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java15
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java11
3 files changed, 79 insertions, 137 deletions
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
index b6d3cc07..72ca84a5 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
@@ -20,36 +20,26 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
-import static ch.qos.logback.classic.Level.WARN;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.ArrayList;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
@@ -57,8 +47,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
@@ -79,151 +69,113 @@ class DmaapMessageConsumerTest {
LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
}
- @Test
- void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
- doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
- messageConsumerUnderTest.start().join();
-
- InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
- orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+ private void setTaskNumberOfLoops(int number) {
+ ArrayList<Integer> l = new ArrayList<>();
+ for (int i = 0; i < number; ++i) {
+ l.add(i);
+ }
+ Flux<Integer> f = Flux.fromIterable(l);
+ doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
}
- @Test
- void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
- doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
- messageConsumerUnderTest.start().join();
-
- InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
- orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ private void disableTaskDelay() {
+ doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
}
@Test
- void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
- setUpMrConfig();
-
+ void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- Mono<ResponseEntity<String>> response = Mono.empty();
+ setTaskNumberOfLoops(3);
+ disableTaskDelay();
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
+ when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
+ doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
+ .getFromMessageRouter(anyString());
- messageConsumerUnderTest.start().join();
+ doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- verify(messageRouterConsumerMock).getForEntity(any());
- verifyNoMoreInteractions(messageRouterConsumerMock);
+ String s = messageConsumerUnderTest.createTask().blockLast();
+ assertEquals("responseFromHandler", s);
+ verify(messageConsumerUnderTest, times(2)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
}
@Test
- void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
- setUpMrConfig();
-
+ void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
- when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+ setTaskNumberOfLoops(2);
+ disableTaskDelay();
+ setUpMrConfig();
- final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
+ {
+ Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
+ Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+ doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+ }
- messageConsumerUnderTest.start().join();
+ doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- assertThat(logAppender.list.get(0).getFormattedMessage())
- .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+ String s = messageConsumerUnderTest.createTask().blockLast();
- verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+ verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
+ verify(messageConsumerUnderTest, times(1)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+ assertEquals("response1", s);
}
@Test
- void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
- // The message from MR is here an array of Json objects
- setUpMrConfig();
+ void unParsableMessage_thenSendResponseAndContinue() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+ setTaskNumberOfLoops(2);
+ setUpMrConfig();
- String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT)));
-
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
- when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+ {
+ Mono<String> dmaapError = Mono.just("Non valid JSON \"");
+ Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+ doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+ }
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+ doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- messageConsumerUnderTest.start().join();
+ String s = messageConsumerUnderTest.createTask().blockLast();
+ assertEquals("response1", s);
- ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class);
- verify(messageHandlerMock).handleDmaapMsg(captor.capture());
- DmaapRequestMessage messageAfterJsonParsing = captor.getValue();
- assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty();
+ verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+ verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
+ verify(messageConsumerUnderTest, times(0)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+ }
- verifyNoMoreInteractions(messageHandlerMock);
+ private String dmaapRequestMessageString() {
+ String json = gson.toJson(dmaapRequestMessage());
+ return jsonArray(json);
}
@Test
void testMessageParsing() throws ServiceException {
messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
- String json = gson.toJson(dmaapRequestMessage(Operation.PUT));
+ String json = gson.toJson(dmaapRequestMessage());
{
String jsonArrayOfObject = jsonArray(json);
- List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject);
+ DmaapRequestMessage parsedMessage =
+ messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
assertNotNull(parsedMessage);
- assertTrue(parsedMessage.get(0).payload().isPresent());
+ assertTrue(parsedMessage.payload().isPresent());
}
{
String jsonArrayOfString = jsonArray(quote(json));
- List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString);
+ DmaapRequestMessage parsedMessage =
+ messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
assertNotNull(parsedMessage);
- assertTrue(parsedMessage.get(0).payload().isPresent());
+ assertTrue(parsedMessage.payload().isPresent());
}
}
- @Test
- void incomingUnparsableRequest_thenSendResponse() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
- doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- Exception actualException =
- assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
- assertThat(actualException.getMessage())
- .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException");
-
- verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- }
-
- @Test
- void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
- doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(),
- any(), any());
- Exception actualException =
- assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
- assertThat(actualException.getMessage()).contains("Sending response failed");
-
- verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- }
-
private void setUpMrConfig() {
when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
@@ -237,11 +189,11 @@ class DmaapMessageConsumerTest {
return "\"" + s.replace("\"", "\\\"") + "\"";
}
- private DmaapRequestMessage dmaapRequestMessage(Operation operation) {
+ private DmaapRequestMessage dmaapRequestMessage() {
return ImmutableDmaapRequestMessage.builder() //
.apiVersion("apiVersion") //
.correlationId("correlationId") //
- .operation(operation) //
+ .operation(Operation.PUT) //
.originatorId("originatorId") //
.payload(new JsonObject()) //
.requestId("requestId") //
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
index 99468811..df84ae05 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
@@ -110,7 +110,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -130,7 +130,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -152,7 +152,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.verifyComplete(); //
@@ -170,7 +170,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -189,7 +189,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -208,7 +208,7 @@ class DmaapMessageHandlerTest {
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
- testedObject.createTask(message).block();
+ testedObject.handleDmaapMsg(message).block();
verify(pmsClient).putForEntity(anyString(), anyString());
verifyNoMoreInteractions(pmsClient);
@@ -239,7 +239,8 @@ class DmaapMessageHandlerTest {
final ListAppender<ILoggingEvent> logAppender =
LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
- testedObject.handleDmaapMsg(message);
+ doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
+ testedObject.handleDmaapMsg(message).block();
assertThat(logAppender.list.get(0).getFormattedMessage())
.startsWith("Expected payload in message from DMAAP: ");
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
index 2e96b680..4cc23607 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
@@ -134,17 +134,6 @@ class RefreshConfigTaskTest {
}
@Test
- void stop_thenTaskIsDisposed() throws Exception {
- refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_DOES_NOT_EXIST, null, null, false);
- refreshTaskUnderTest.systemEnvironment = new Properties();
-
- refreshTaskUnderTest.start();
- refreshTaskUnderTest.stop();
-
- assertThat(refreshTaskUnderTest.getRefreshTask().isDisposed()).as("Refresh task is disposed").isTrue();
- }
-
- @Test
void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_EXISTS);
refreshTaskUnderTest.systemEnvironment = new Properties();