From f86407dfdf0e04979a6765da4eb13f9983e1150e Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 14 Oct 2020 14:14:06 +0200 Subject: Made DmaapMessageConsumer asynchronuous Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e Issue-ID: CCSDK-2502 Signed-off-by: PatrikBuhr --- .../dmaap/DmaapMessageConsumerTest.java | 190 ++++++++------------- .../dmaap/DmaapMessageHandlerTest.java | 15 +- .../tasks/RefreshConfigTaskTest.java | 11 -- 3 files changed, 79 insertions(+), 137 deletions(-) (limited to 'a1-policy-management/src/test/java/org/onap') diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java index b6d3cc07..72ca84a5 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java @@ -20,36 +20,26 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; -import static ch.qos.logback.classic.Level.WARN; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; +import java.util.ArrayList; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; @@ -57,8 +47,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -79,151 +69,113 @@ class DmaapMessageConsumerTest { LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); } - @Test - void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured(); - doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - - messageConsumerUnderTest.start().join(); - - InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); - orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); + private void setTaskNumberOfLoops(int number) { + ArrayList l = new ArrayList<>(); + for (int i = 0; i < number; ++i) { + l.add(i); + } + Flux f = Flux.fromIterable(l); + doReturn(f).when(messageConsumerUnderTest).infiniteFlux(); } - @Test - void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured(); - doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - - messageConsumerUnderTest.start().join(); - - InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); - orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + private void disableTaskDelay() { + doReturn(Mono.empty()).when(messageConsumerUnderTest).delay(); } @Test - void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { - setUpMrConfig(); - + void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - Mono> response = Mono.empty(); + setTaskNumberOfLoops(3); + disableTaskDelay(); - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); + when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl"); + doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest) + .getFromMessageRouter(anyString()); - messageConsumerUnderTest.start().join(); + doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - verify(messageRouterConsumerMock).getForEntity(any()); - verifyNoMoreInteractions(messageRouterConsumerMock); + String s = messageConsumerUnderTest.createTask().blockLast(); + assertEquals("responseFromHandler", s); + verify(messageConsumerUnderTest, times(2)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); } @Test - void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { - setUpMrConfig(); - + void returnErrorFromDmapp_thenSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - - Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); - when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + setTaskNumberOfLoops(2); + disableTaskDelay(); + setUpMrConfig(); - final ListAppender logAppender = - LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); + { + Mono dmaapError = Mono.error(new ServiceException("dmaapError")); + Mono dmaapResponse = Mono.just(dmaapRequestMessageString()); + doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString()); + } - messageConsumerUnderTest.start().join(); + doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - assertThat(logAppender.list.get(0).getFormattedMessage()) - .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); + String s = messageConsumerUnderTest.createTask().blockLast(); - verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString()); + verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString()); + verify(messageConsumerUnderTest, times(1)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); + assertEquals("response1", s); } @Test - void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { - // The message from MR is here an array of Json objects - setUpMrConfig(); + void unParsableMessage_thenSendResponseAndContinue() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + setTaskNumberOfLoops(2); + setUpMrConfig(); - String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT))); - - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - - Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); - when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + { + Mono dmaapError = Mono.just("Non valid JSON \""); + Mono dmaapResponse = Mono.just(dmaapRequestMessageString()); + doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString()); + } - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - messageConsumerUnderTest.start().join(); + String s = messageConsumerUnderTest.createTask().blockLast(); + assertEquals("response1", s); - ArgumentCaptor captor = ArgumentCaptor.forClass(DmaapRequestMessage.class); - verify(messageHandlerMock).handleDmaapMsg(captor.capture()); - DmaapRequestMessage messageAfterJsonParsing = captor.getValue(); - assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty(); + verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString()); + verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString()); + verify(messageConsumerUnderTest, times(0)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); + } - verifyNoMoreInteractions(messageHandlerMock); + private String dmaapRequestMessageString() { + String json = gson.toJson(dmaapRequestMessage()); + return jsonArray(json); } @Test void testMessageParsing() throws ServiceException { messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock); - String json = gson.toJson(dmaapRequestMessage(Operation.PUT)); + String json = gson.toJson(dmaapRequestMessage()); { String jsonArrayOfObject = jsonArray(json); - List parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject); + DmaapRequestMessage parsedMessage = + messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast(); assertNotNull(parsedMessage); - assertTrue(parsedMessage.get(0).payload().isPresent()); + assertTrue(parsedMessage.payload().isPresent()); } { String jsonArrayOfString = jsonArray(quote(json)); - List parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString); + DmaapRequestMessage parsedMessage = + messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast(); assertNotNull(parsedMessage); - assertTrue(parsedMessage.get(0).payload().isPresent()); + assertTrue(parsedMessage.payload().isPresent()); } } - @Test - void incomingUnparsableRequest_thenSendResponse() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - Exception actualException = - assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); - assertThat(actualException.getMessage()) - .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException"); - - verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - } - - @Test - void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(), - any(), any()); - Exception actualException = - assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); - assertThat(actualException.getMessage()).contains("Sending response failed"); - - verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - } - private void setUpMrConfig() { when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); @@ -237,11 +189,11 @@ class DmaapMessageConsumerTest { return "\"" + s.replace("\"", "\\\"") + "\""; } - private DmaapRequestMessage dmaapRequestMessage(Operation operation) { + private DmaapRequestMessage dmaapRequestMessage() { return ImmutableDmaapRequestMessage.builder() // .apiVersion("apiVersion") // .correlationId("correlationId") // - .operation(operation) // + .operation(Operation.PUT) // .originatorId("originatorId") // .payload(new JsonObject()) // .requestId("requestId") // diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java index 99468811..df84ae05 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java @@ -110,7 +110,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -130,7 +130,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -152,7 +152,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .verifyComplete(); // @@ -170,7 +170,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -189,7 +189,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.POST); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -208,7 +208,7 @@ class DmaapMessageHandlerTest { doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); - testedObject.createTask(message).block(); + testedObject.handleDmaapMsg(message).block(); verify(pmsClient).putForEntity(anyString(), anyString()); verifyNoMoreInteractions(pmsClient); @@ -239,7 +239,8 @@ class DmaapMessageHandlerTest { final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN); - testedObject.handleDmaapMsg(message); + doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString()); + testedObject.handleDmaapMsg(message).block(); assertThat(logAppender.list.get(0).getFormattedMessage()) .startsWith("Expected payload in message from DMAAP: "); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java index 2e96b680..4cc23607 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java @@ -133,17 +133,6 @@ class RefreshConfigTaskTest { return obj; } - @Test - void stop_thenTaskIsDisposed() throws Exception { - refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_DOES_NOT_EXIST, null, null, false); - refreshTaskUnderTest.systemEnvironment = new Properties(); - - refreshTaskUnderTest.start(); - refreshTaskUnderTest.stop(); - - assertThat(refreshTaskUnderTest.getRefreshTask().isDisposed()).as("Refresh task is disposed").isTrue(); - } - @Test void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception { refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_EXISTS); -- cgit