diff options
author | PatrikBuhr <patrik.buhr@est.tech> | 2020-10-14 14:14:06 +0200 |
---|---|---|
committer | PatrikBuhr <patrik.buhr@est.tech> | 2020-10-19 08:34:58 +0200 |
commit | f86407dfdf0e04979a6765da4eb13f9983e1150e (patch) | |
tree | d5a2fb87cfbbd062bf0e3d613cc8a0ee04e7b18d | |
parent | df373ec4c902a2596dd2dfe957425af1e3113b17 (diff) |
Made DmaapMessageConsumer asynchronuous
Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e
Issue-ID: CCSDK-2502
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
6 files changed, 184 insertions, 223 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index 8409f45c..7f453a27 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -188,7 +188,7 @@ public class AsyncRestClient { logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), exception.getResponseBodyAsString()); } else { - logger.debug("{} HTTP error", traceTag, t); + logger.debug("{} HTTP error {}", traceTag, t.getMessage()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java index 3a365178..f948e5f5 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java @@ -20,7 +20,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; -import com.google.common.collect.Iterables; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; @@ -37,15 +36,17 @@ import java.util.ServiceLoader; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + /** * The class fetches incoming requests from DMAAP. It uses the timeout parameter * that lets the MessageRouter keep the connection with the Kafka open until @@ -74,60 +75,82 @@ public class DmaapMessageConsumer { private final AsyncRestClientFactory restClientFactory; + private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); + @Value("${server.http-port}") private int localServerHttpPort; + private static class InfiniteFlux { + private FluxSink<Integer> sink; + private int counter = 0; + + public synchronized Flux<Integer> start() { + stop(); + return Flux.create(this::next).doOnRequest(this::onRequest); + } + + public synchronized void stop() { + if (this.sink != null) { + this.sink.complete(); + this.sink = null; + } + } + + void onRequest(long no) { + logger.debug("InfiniteFlux.onRequest {}", no); + for (long i = 0; i < no; ++i) { + sink.next(counter++); + } + } + + void next(FluxSink<Integer> sink) { + logger.debug("InfiniteFlux.next"); + this.sink = sink; + sink.next(counter++); + } + + } + @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - gson = gsonBuilder.create(); + this.gson = gsonBuilder.create(); this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } /** - * Starts the consumer. If there is a DMaaP configuration, it will start polling - * for messages. Otherwise it will check regularly for the configuration. + * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start + * polling for messages. Otherwise it will check regularly for the + * configuration. * - * @return the running thread, for test purposes. */ - public Thread start() { - Thread thread = new Thread(this::messageHandlingLoop); - thread.start(); - return thread; + public void start() { + infiniteSubmitter.stop(); + + createTask().subscribe(// + value -> logger.debug("DmaapMessageConsumer next: {}", value), // + throwable -> logger.error("DmaapMessageConsumer error: {}", throwable), // + () -> logger.warn("DmaapMessageConsumer stopped") // + ); } - private void messageHandlingLoop() { - while (!isStopped()) { - try { - if (isDmaapConfigured()) { - Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages(); - if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { - logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (DmaapRequestMessage msg : dmaapMsgs) { - processMsg(msg); - } - } - } else { - sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration - } - } catch (Exception e) { - logger.warn("{}", e.getMessage()); - sleep(TIME_BETWEEN_DMAAP_RETRIES); - } - } + protected Flux<String> createTask() { + return infiniteFlux() // + .flatMap(notUsed -> fetchFromDmaap(), 1) // + .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) // + .flatMap(this::parseReceivedMessage, 1)// + .flatMap(this::handleDmaapMsg, 1) // + .onErrorResume(throwable -> Mono.empty()); } - protected boolean isStopped() { - return false; + protected Flux<Integer> infiniteFlux() { + return infiniteSubmitter.start(); } - protected boolean isDmaapConfigured() { - String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); - String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); - return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty() - && !consumerTopicUrl.isEmpty()); + protected Mono<Object> delay() { + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty()); } private <T> List<T> parseList(String jsonString, Class<T> clazz) { @@ -146,7 +169,36 @@ public class DmaapMessageConsumer { return result; } - private void sendErrorResponse(String response) { + protected boolean isDmaapConfigured() { + String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); + String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); + return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty() + && !consumerTopicUrl.isEmpty()); + } + + protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) { + return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage); + } + + protected Mono<String> getFromMessageRouter(String topicUrl) { + logger.trace("getFromMessageRouter {}", topicUrl); + AsyncRestClient c = restClientFactory.createRestClient(""); + return c.get(topicUrl); + } + + protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) { + try { + logger.trace("parseMessages {}", jsonString); + return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class)); + } catch (Exception e) { + logger.error("parseMessages error {}", jsonString); + return sendErrorResponse("Could not parse: " + jsonString) // + .flatMapMany(s -> Flux.empty()); + } + } + + protected Mono<String> sendErrorResponse(String response) { + logger.debug("sendErrorResponse {}", response); DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() // .apiVersion("") // .correlationId("") // @@ -158,37 +210,23 @@ public class DmaapMessageConsumer { .timestamp("") // .url("URL") // .build(); - getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block(); + return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) // + .onErrorResume(e -> Mono.empty()); } - List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException { - try { - return parseList(jsonString, DmaapRequestMessage.class); - } catch (Exception e) { - sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString); - throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage()); + private Mono<String> fetchFromDmaap() { + if (!this.isDmaapConfigured()) { + logger.debug("fetchFromDmaap, no action DMAAP not configured"); + return delay().flatMap(o -> Mono.empty()); } - } - - protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException { + logger.debug("fetchFromDmaap"); String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); - AsyncRestClient consumer = getMessageRouterConsumer(); - ResponseEntity<String> response = consumer.getForEntity(topicUrl).block(); - logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody()); - if (response.getStatusCode().is2xxSuccessful()) { - return parseMessages(response.getBody()); - } else { - throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString() - + " " + response.getBody()); - } - } - private void processMsg(DmaapRequestMessage msg) { - logger.debug("Message Reveived from DMAAP : {}", msg); - getDmaapMessageHandler().handleDmaapMsg(msg); + return getFromMessageRouter(topicUrl) // + .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty())); } - protected DmaapMessageHandler getDmaapMessageHandler() { + private DmaapMessageHandler getDmaapMessageHandler() { if (this.dmaapMessageHandler == null) { String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort; AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl); @@ -199,16 +237,4 @@ public class DmaapMessageConsumer { return this.dmaapMessageHandler; } - protected void sleep(Duration duration) { - try { - Thread.sleep(duration.toMillis()); - } catch (Exception e) { - logger.error("Failed to put the thread to sleep", e); - } - } - - protected AsyncRestClient getMessageRouterConsumer() { - return restClientFactory.createRestClient(""); - } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java index 967cab1d..c77087a5 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java @@ -54,20 +54,13 @@ public class DmaapMessageHandler { this.dmaapClient = dmaapClient; } - public void handleDmaapMsg(DmaapRequestMessage msg) { - try { - String result = this.createTask(msg).block(); - logger.debug("handleDmaapMsg: {}", result); - } catch (Exception throwable) { - logger.warn("handleDmaapMsg failure {}", throwable.getMessage()); - } - } - - Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) { + public Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) { return this.invokePolicyManagementService(dmaapRequestMessage) // .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, - response.getStatusCode())); + response.getStatusCode())) + .doOnError(t -> logger.warn("Failed to handle DMAAP message : {}", t.getMessage()))// + .onErrorResume(t -> Mono.empty()); } private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error, diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java index b6d3cc07..72ca84a5 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java @@ -20,36 +20,26 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; -import static ch.qos.logback.classic.Level.WARN; -import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; -import java.time.Duration; -import java.util.LinkedList; -import java.util.List; +import java.util.ArrayList; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; @@ -57,8 +47,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; + +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) @@ -79,151 +69,113 @@ class DmaapMessageConsumerTest { LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); } - @Test - void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured(); - doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - - messageConsumerUnderTest.start().join(); - - InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); - orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); + private void setTaskNumberOfLoops(int number) { + ArrayList<Integer> l = new ArrayList<>(); + for (int i = 0; i < number; ++i) { + l.add(i); + } + Flux<Integer> f = Flux.fromIterable(l); + doReturn(f).when(messageConsumerUnderTest).infiniteFlux(); } - @Test - void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured(); - doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages(); - - messageConsumerUnderTest.start().join(); - - InOrder orderVerifier = inOrder(messageConsumerUnderTest); - orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages(); - orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + private void disableTaskDelay() { + doReturn(Mono.empty()).when(messageConsumerUnderTest).delay(); } @Test - void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception { - setUpMrConfig(); - + void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - Mono<ResponseEntity<String>> response = Mono.empty(); + setTaskNumberOfLoops(3); + disableTaskDelay(); - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); + when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl"); + doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured(); + doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest) + .getFromMessageRouter(anyString()); - messageConsumerUnderTest.start().join(); + doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - verify(messageRouterConsumerMock).getForEntity(any()); - verifyNoMoreInteractions(messageRouterConsumerMock); + String s = messageConsumerUnderTest.createTask().blockLast(); + assertEquals("responseFromHandler", s); + verify(messageConsumerUnderTest, times(2)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); } @Test - void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception { - setUpMrConfig(); - + void returnErrorFromDmapp_thenSleepAndRetry() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - - Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); - when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + setTaskNumberOfLoops(2); + disableTaskDelay(); + setUpMrConfig(); - final ListAppender<ILoggingEvent> logAppender = - LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); + { + Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError")); + Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString()); + doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString()); + } - messageConsumerUnderTest.start().join(); + doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - assertThat(logAppender.list.get(0).getFormattedMessage()) - .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); + String s = messageConsumerUnderTest.createTask().blockLast(); - verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); + verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString()); + verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString()); + verify(messageConsumerUnderTest, times(1)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); + assertEquals("response1", s); } @Test - void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { - // The message from MR is here an array of Json objects - setUpMrConfig(); + void unParsableMessage_thenSendResponseAndContinue() throws Exception { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + setTaskNumberOfLoops(2); + setUpMrConfig(); - String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT))); - - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - - Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); - when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + { + Mono<String> dmaapError = Mono.just("Non valid JSON \""); + Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString()); + doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString()); + } - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any()); - messageConsumerUnderTest.start().join(); + String s = messageConsumerUnderTest.createTask().blockLast(); + assertEquals("response1", s); - ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class); - verify(messageHandlerMock).handleDmaapMsg(captor.capture()); - DmaapRequestMessage messageAfterJsonParsing = captor.getValue(); - assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty(); + verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString()); + verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString()); + verify(messageConsumerUnderTest, times(0)).delay(); + verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage()); + } - verifyNoMoreInteractions(messageHandlerMock); + private String dmaapRequestMessageString() { + String json = gson.toJson(dmaapRequestMessage()); + return jsonArray(json); } @Test void testMessageParsing() throws ServiceException { messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock); - String json = gson.toJson(dmaapRequestMessage(Operation.PUT)); + String json = gson.toJson(dmaapRequestMessage()); { String jsonArrayOfObject = jsonArray(json); - List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject); + DmaapRequestMessage parsedMessage = + messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast(); assertNotNull(parsedMessage); - assertTrue(parsedMessage.get(0).payload().isPresent()); + assertTrue(parsedMessage.payload().isPresent()); } { String jsonArrayOfString = jsonArray(quote(json)); - List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString); + DmaapRequestMessage parsedMessage = + messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast(); assertNotNull(parsedMessage); - assertTrue(parsedMessage.get(0).payload().isPresent()); + assertTrue(parsedMessage.payload().isPresent()); } } - @Test - void incomingUnparsableRequest_thenSendResponse() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - Exception actualException = - assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); - assertThat(actualException.getMessage()) - .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException"); - - verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - } - - @Test - void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception { - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); - doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(), - any(), any()); - Exception actualException = - assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); - assertThat(actualException.getMessage()).contains("Sending response failed"); - - verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); - } - private void setUpMrConfig() { when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); @@ -237,11 +189,11 @@ class DmaapMessageConsumerTest { return "\"" + s.replace("\"", "\\\"") + "\""; } - private DmaapRequestMessage dmaapRequestMessage(Operation operation) { + private DmaapRequestMessage dmaapRequestMessage() { return ImmutableDmaapRequestMessage.builder() // .apiVersion("apiVersion") // .correlationId("correlationId") // - .operation(operation) // + .operation(Operation.PUT) // .originatorId("originatorId") // .payload(new JsonObject()) // .requestId("requestId") // diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java index 99468811..df84ae05 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java @@ -110,7 +110,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -130,7 +130,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -152,7 +152,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .verifyComplete(); // @@ -170,7 +170,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -189,7 +189,7 @@ class DmaapMessageHandlerTest { DmaapRequestMessage message = dmaapRequestMessage(Operation.POST); StepVerifier // - .create(testedObject.createTask(message)) // + .create(testedObject.handleDmaapMsg(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -208,7 +208,7 @@ class DmaapMessageHandlerTest { doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); - testedObject.createTask(message).block(); + testedObject.handleDmaapMsg(message).block(); verify(pmsClient).putForEntity(anyString(), anyString()); verifyNoMoreInteractions(pmsClient); @@ -239,7 +239,8 @@ class DmaapMessageHandlerTest { final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN); - testedObject.handleDmaapMsg(message); + doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString()); + testedObject.handleDmaapMsg(message).block(); assertThat(logAppender.list.get(0).getFormattedMessage()) .startsWith("Expected payload in message from DMAAP: "); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java index 2e96b680..4cc23607 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java @@ -134,17 +134,6 @@ class RefreshConfigTaskTest { } @Test - void stop_thenTaskIsDisposed() throws Exception { - refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_DOES_NOT_EXIST, null, null, false); - refreshTaskUnderTest.systemEnvironment = new Properties(); - - refreshTaskUnderTest.start(); - refreshTaskUnderTest.stop(); - - assertThat(refreshTaskUnderTest.getRefreshTask().isDisposed()).as("Refresh task is disposed").isTrue(); - } - - @Test void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception { refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_EXISTS); refreshTaskUnderTest.systemEnvironment = new Properties(); |