diff options
author | KAPIL SINGAL <ks220y@att.com> | 2020-08-27 12:43:12 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-08-27 12:43:12 +0000 |
commit | 9bb32130eed83b9116f3e5d6f51a5a386c2fbe49 (patch) | |
tree | 1f306de80f3d71bf6f55e60c269b749bd8d3891a | |
parent | 7e634df0672f4ed7e6a89c373dfab08439cea630 (diff) | |
parent | cbe6e68610bb477d64d776df2fbf0eccece533eb (diff) |
Merge "DMAAP Improvements"
6 files changed, 167 insertions, 133 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java index 37a092bb..a4da4bdd 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java @@ -21,13 +21,18 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; import com.google.common.collect.Iterables; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonParser; +import com.google.gson.TypeAdapterFactory; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.ServiceLoader; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; @@ -36,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; @@ -63,12 +69,17 @@ public class DmaapMessageConsumer { private DmaapMessageHandler dmaapMessageHandler = null; + private final Gson gson; + @Value("${server.http-port}") private int localServerHttpPort; @Autowired public DmaapMessageConsumer(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; + GsonBuilder gsonBuilder = new GsonBuilder(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + gson = gsonBuilder.create(); } /** @@ -87,10 +98,10 @@ public class DmaapMessageConsumer { while (!isStopped()) { try { if (isDmaapConfigured()) { - Iterable<String> dmaapMsgs = fetchAllMessages(); + Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages(); if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) { logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); - for (String msg : dmaapMsgs) { + for (DmaapRequestMessage msg : dmaapMsgs) { processMsg(msg); } } @@ -115,21 +126,47 @@ public class DmaapMessageConsumer { && !consumerTopicUrl.isEmpty()); } - private static List<String> parseMessages(String jsonString) { - JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray(); - List<String> result = new ArrayList<>(); - for (JsonElement element : arrayOfMessages) { - if (element.isJsonPrimitive()) { - result.add(element.getAsString()); + private <T> List<T> parseList(String jsonString, Class<T> clazz) { + List<T> result = new ArrayList<>(); + JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray(); + for (JsonElement jsonElement : jsonArr) { + // The element can either be a JsonObject or a JsonString + if (jsonElement.isJsonPrimitive()) { + T json = gson.fromJson(jsonElement.getAsString(), clazz); + result.add(json); } else { - String messageAsString = element.toString(); - result.add(messageAsString); + T json = gson.fromJson(jsonElement.toString(), clazz); + result.add(json); } } return result; } - protected Iterable<String> fetchAllMessages() throws ServiceException { + private void sendErrorResponse(String response) { + DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() // + .apiVersion("") // + .correlationId("") // + .operation(DmaapRequestMessage.Operation.PUT) // + .originatorId("") // + .payload(Optional.empty()) // + .requestId("") // + .target("") // + .timestamp("") // + .url("URL") // + .build(); + getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block(); + } + + List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException { + try { + return parseList(jsonString, DmaapRequestMessage.class); + } catch (Exception e) { + sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString); + throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage()); + } + } + + protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException { String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); AsyncRestClient consumer = getMessageRouterConsumer(); ResponseEntity<String> response = consumer.getForEntity(topicUrl).block(); @@ -142,7 +179,7 @@ public class DmaapMessageConsumer { } } - private void processMsg(String msg) { + private void processMsg(DmaapRequestMessage msg) { logger.debug("Message Reveived from DMAAP : {}", msg); getDmaapMessageHandler().handleDmaapMsg(msg); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java index 040d8b3e..2d7b5063 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java @@ -40,12 +40,12 @@ import reactor.core.publisher.Mono; /** * The class handles incoming requests from DMAAP. * <p> - * That means: invoke a REST call towards this services and to send back a response though DMAAP + * That means: invoke a REST call towards this services and to send back a + * response though DMAAP */ public class DmaapMessageHandler { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); - private static Gson gson = new GsonBuilder() // - .create(); // + private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient dmaapClient; private final AsyncRestClient pmsClient; @@ -54,7 +54,7 @@ public class DmaapMessageHandler { this.dmaapClient = dmaapClient; } - public void handleDmaapMsg(String msg) { + public void handleDmaapMsg(DmaapRequestMessage msg) { try { String result = this.createTask(msg).block(); logger.debug("handleDmaapMsg: {}", result); @@ -63,17 +63,10 @@ public class DmaapMessageHandler { } } - Mono<String> createTask(String msg) { - try { - DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class); - return this.invokePolicyManagementService(dmaapRequestMessage) // - .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // - .flatMap( - response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); - } catch (Exception e) { - String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage(); - return Mono.error(new ServiceException(errorMsg)); // Cannot make any response - } + Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) { + return this.invokePolicyManagementService(dmaapRequestMessage) // + .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) // + .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); } private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error, @@ -95,6 +88,12 @@ public class DmaapMessageHandler { .flatMap(notUsed -> Mono.empty()); } + public Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, HttpStatus status) { + return createDmaapResponseMessage(dmaapRequestMessage, response, status) // + .flatMap(this::sendToDmaap) // + .onErrorResume(this::handleResponseCallError); + } + private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) { DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation(); String uri = dmaapRequestMessage.url(); @@ -122,20 +121,13 @@ public class DmaapMessageHandler { } } - private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, - HttpStatus status) { - return createDmaapResponseMessage(dmaapRequestMessage, response, status) // - .flatMap(this::sendToDmaap) // - .onErrorResume(this::handleResponseCallError); - } - private Mono<String> sendToDmaap(String body) { logger.debug("sendToDmaap: {} ", body); return dmaapClient.post("", "[" + body + "]"); } private Mono<String> handleResponseCallError(Throwable t) { - logger.debug("Failed to send response to DMaaP: {}", t.getMessage()); + logger.warn("Failed to send response to DMaaP: {}", t.getMessage()); return Mono.empty(); } @@ -152,6 +144,5 @@ public class DmaapMessageHandler { .build(); String str = gson.toJson(dmaapResponseMessage); return Mono.just(str); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 57d6969f..248ba32d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -20,11 +20,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; -import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.TypeAdapterFactory; import java.io.BufferedInputStream; import java.io.File; @@ -34,7 +32,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.time.Duration; import java.util.Properties; -import java.util.ServiceLoader; import javax.validation.constraints.NotNull; @@ -239,9 +236,6 @@ public class RefreshConfigTask { return Flux.empty(); } - GsonBuilder gsonBuilder = new GsonBuilder(); - ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - try (InputStream inputStream = createInputStream(filepath)) { JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject(); ApplicationConfigParser appParser = new ApplicationConfigParser(); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java index 08e99847..1e9695c7 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java @@ -20,7 +20,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; -import static org.junit.Assert.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyString; diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java index cc9efa07..2bd8c7f4 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java @@ -22,6 +22,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -34,8 +37,13 @@ import static org.mockito.Mockito.when; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; + import java.time.Duration; import java.util.LinkedList; +import java.util.List; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -46,6 +54,8 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation; +import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -62,6 +72,8 @@ class DmaapMessageConsumerTest { private DmaapMessageConsumer messageConsumerUnderTest; + private Gson gson = new GsonBuilder().create(); + @AfterEach void resetLogging() { LoggingUtils.getLogListAppender(DmaapMessageConsumer.class); @@ -147,17 +159,7 @@ class DmaapMessageConsumerTest { setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - String message = "{\"apiVersion\":\"1.0\"," // - + "\"operation\":\"GET\"," // - + "\"correlationId\":\"1592341013115594000\"," // - + "\"originatorId\":\"849e6c6b420\"," // - + "\"payload\":{}," // - + "\"requestId\":\"23343221\", " // - + "\"target\":\"policy-management-service\"," // - + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," // - + "\"type\":\"request\"," // - + "\"url\":\"/rics\"}"; - String messages = "[" + message + "]"; + String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT))); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); @@ -169,37 +171,84 @@ class DmaapMessageConsumerTest { messageConsumerUnderTest.start().join(); - ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class); verify(messageHandlerMock).handleDmaapMsg(captor.capture()); - String messageAfterJsonParsing = captor.getValue(); - assertThat(messageAfterJsonParsing).contains("apiVersion"); + DmaapRequestMessage messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty(); verifyNoMoreInteractions(messageHandlerMock); } @Test - void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { - // The message from MR is here an array of String (which is the case when the MR - // simulator is used) - setUpMrConfig(); - messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - - doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + void testMessageParsing() throws ServiceException { + messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock); + String json = gson.toJson(dmaapRequestMessage(Operation.PUT)); + { + String jsonArrayOfObject = jsonArray(json); + List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject); + assertNotNull(parsedMessage); + assertTrue(parsedMessage.get(0).payload().isPresent()); + } + { + String jsonArrayOfString = jsonArray(quote(json)); + List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString); + assertNotNull(parsedMessage); + assertTrue(parsedMessage.get(0).payload().isPresent()); + } - Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); - when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + } + @Test + void incomingUnparsableRequest_thenSendResponse() throws Exception { + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any()); + Exception actualException = + assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); + assertThat(actualException.getMessage()) + .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException"); - messageConsumerUnderTest.start().join(); + verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); + } - verify(messageHandlerMock).handleDmaapMsg("aMessage"); - verifyNoMoreInteractions(messageHandlerMock); + @Test + void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception { + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(), + any(), any()); + Exception actualException = + assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]")); + assertThat(actualException.getMessage()).contains("Sending response failed"); + + verify(messageHandlerMock).sendDmaapResponse(any(), any(), any()); } private void setUpMrConfig() { when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); } + + private String jsonArray(String s) { + return "[" + s + "]"; + } + + private String quote(String s) { + return "\"" + s.replace("\"", "\\\"") + "\""; + } + + private DmaapRequestMessage dmaapRequestMessage(Operation operation) { + return ImmutableDmaapRequestMessage.builder() // + .apiVersion("apiVersion") // + .correlationId("correlationId") // + .operation(operation) // + .originatorId("originatorId") // + .payload(new JsonObject()) // + .requestId("requestId") // + .target("target") // + .timestamp("timestamp") // + .url("URL") // + .build(); + } + } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java index 2405e46c..3656ec1a 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java @@ -22,9 +22,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap; import static ch.qos.logback.classic.Level.WARN; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -66,19 +63,18 @@ class DmaapMessageHandlerTest { private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class); private final AsyncRestClient pmsClient = mock(AsyncRestClient.class); private DmaapMessageHandler testedObject; - private static Gson gson = new GsonBuilder() // - .create(); // + private Gson gson = new GsonBuilder().create(); // @BeforeEach private void setUp() throws Exception { testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient)); } - static JsonObject payloadAsJson() { + JsonObject payloadAsJson() { return gson.fromJson(payloadAsString(), JsonObject.class); } - static String payloadAsString() { + String payloadAsString() { return "{\"param\":\"value\"}"; } @@ -99,10 +95,6 @@ class DmaapMessageHandlerTest { .build(); } - private String dmaapInputMessage(Operation operation) { - return gson.toJson(dmaapRequestMessage(operation)); - } - private Mono<ResponseEntity<String>> okResponse() { ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK); return Mono.just(entity); @@ -114,39 +106,11 @@ class DmaapMessageHandlerTest { } @Test - void testMessageParsing() { - String message = dmaapInputMessage(Operation.DELETE); - logger.info(message); - DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class); - assertNotNull(parsedMessage); - assertFalse(parsedMessage.payload().isPresent()); - - message = dmaapInputMessage(Operation.PUT); - logger.info(message); - parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class); - assertNotNull(parsedMessage); - assertTrue(parsedMessage.payload().isPresent()); - } - - @Test - void unparseableMessage_thenWarning() { - final ListAppender<ILoggingEvent> logAppender = - LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN); - - String msg = "bad message"; - testedObject.handleDmaapMsg(msg); - - assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith( - "handleDmaapMsg failure org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException: Received unparsable " - + "message from DMAAP: \"" + msg + "\", reason: "); - } - - @Test void successfulDelete() throws IOException { doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); - String message = dmaapInputMessage(Operation.DELETE); + DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE); StepVerifier // .create(testedObject.createTask(message)) // @@ -167,8 +131,9 @@ class DmaapMessageHandlerTest { doReturn(okResponse()).when(pmsClient).getForEntity(anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); + DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) // + .create(testedObject.createTask(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -188,8 +153,9 @@ class DmaapMessageHandlerTest { doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); + DmaapRequestMessage message = dmaapRequestMessage(Operation.GET); StepVerifier // - .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) // + .create(testedObject.createTask(message)) // .expectSubscription() // .verifyComplete(); // @@ -205,8 +171,9 @@ class DmaapMessageHandlerTest { doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); + DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); StepVerifier // - .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) // + .create(testedObject.createTask(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -223,8 +190,9 @@ class DmaapMessageHandlerTest { doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); + DmaapRequestMessage message = dmaapRequestMessage(Operation.POST); StepVerifier // - .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) // + .create(testedObject.createTask(message)) // .expectSubscription() // .expectNext("OK") // .verifyComplete(); // @@ -237,12 +205,13 @@ class DmaapMessageHandlerTest { } @Test - void exceptionWhenCallingPms_thenNotFoundResponse() throws IOException { + void exceptionWhenCallingPms_thenErrorResponse() throws IOException { doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString()); doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); - testedObject.createTask(dmaapInputMessage(Operation.PUT)).block(); + DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT); + testedObject.createTask(message).block(); verify(pmsClient).putForEntity(anyString(), anyString()); verifyNoMoreInteractions(pmsClient); @@ -257,24 +226,18 @@ class DmaapMessageHandlerTest { } @Test - void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception { - String message = dmaapInputMessage(Operation.PUT).toString(); - String badOperation = "BAD"; - message = message.replace(Operation.PUT.toString(), badOperation); - - testedObject.handleDmaapMsg(message); - - ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class); - verify(dmaapClient).post(anyString(), captor.capture()); - String actualMessage = captor.getValue(); - assertThat(actualMessage).contains("Not implemented operation") // - .contains("BAD_REQUEST"); - } - - @Test void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception { - String message = dmaapInputMessage(Operation.PUT).toString(); - message = message.replace("payload", "junk"); + DmaapRequestMessage message = ImmutableDmaapRequestMessage.builder() // + .apiVersion("apiVersion") // + .correlationId("correlationId") // + .operation(DmaapRequestMessage.Operation.PUT) // + .originatorId("originatorId") // + .payload(Optional.empty()) // + .requestId("requestId") // + .target("target") // + .timestamp("timestamp") // + .url(URL) // + .build(); final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN); @@ -284,4 +247,5 @@ class DmaapMessageHandlerTest { assertThat(logAppender.list.get(0).getFormattedMessage()) .startsWith("Expected payload in message from DMAAP: "); } + } |