summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKAPIL SINGAL <ks220y@att.com>2020-08-27 12:43:12 +0000
committerGerrit Code Review <gerrit@onap.org>2020-08-27 12:43:12 +0000
commit9bb32130eed83b9116f3e5d6f51a5a386c2fbe49 (patch)
tree1f306de80f3d71bf6f55e60c269b749bd8d3891a
parent7e634df0672f4ed7e6a89c373dfab08439cea630 (diff)
parentcbe6e68610bb477d64d776df2fbf0eccece533eb (diff)
Merge "DMAAP Improvements"
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java61
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java39
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java6
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java1
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java103
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java90
6 files changed, 167 insertions, 133 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
index 37a092bb..a4da4bdd 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
@@ -21,13 +21,18 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
import com.google.common.collect.Iterables;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.TypeAdapterFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.ServiceLoader;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
@@ -36,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
@@ -63,12 +69,17 @@ public class DmaapMessageConsumer {
private DmaapMessageHandler dmaapMessageHandler = null;
+ private final Gson gson;
+
@Value("${server.http-port}")
private int localServerHttpPort;
@Autowired
public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ gson = gsonBuilder.create();
}
/**
@@ -87,10 +98,10 @@ public class DmaapMessageConsumer {
while (!isStopped()) {
try {
if (isDmaapConfigured()) {
- Iterable<String> dmaapMsgs = fetchAllMessages();
+ Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (String msg : dmaapMsgs) {
+ for (DmaapRequestMessage msg : dmaapMsgs) {
processMsg(msg);
}
}
@@ -115,21 +126,47 @@ public class DmaapMessageConsumer {
&& !consumerTopicUrl.isEmpty());
}
- private static List<String> parseMessages(String jsonString) {
- JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
- List<String> result = new ArrayList<>();
- for (JsonElement element : arrayOfMessages) {
- if (element.isJsonPrimitive()) {
- result.add(element.getAsString());
+ private <T> List<T> parseList(String jsonString, Class<T> clazz) {
+ List<T> result = new ArrayList<>();
+ JsonArray jsonArr = JsonParser.parseString(jsonString).getAsJsonArray();
+ for (JsonElement jsonElement : jsonArr) {
+ // The element can either be a JsonObject or a JsonString
+ if (jsonElement.isJsonPrimitive()) {
+ T json = gson.fromJson(jsonElement.getAsString(), clazz);
+ result.add(json);
} else {
- String messageAsString = element.toString();
- result.add(messageAsString);
+ T json = gson.fromJson(jsonElement.toString(), clazz);
+ result.add(json);
}
}
return result;
}
- protected Iterable<String> fetchAllMessages() throws ServiceException {
+ private void sendErrorResponse(String response) {
+ DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
+ .apiVersion("") //
+ .correlationId("") //
+ .operation(DmaapRequestMessage.Operation.PUT) //
+ .originatorId("") //
+ .payload(Optional.empty()) //
+ .requestId("") //
+ .target("") //
+ .timestamp("") //
+ .url("URL") //
+ .build();
+ getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+ }
+
+ List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
+ try {
+ return parseList(jsonString, DmaapRequestMessage.class);
+ } catch (Exception e) {
+ sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
+ throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+ }
+ }
+
+ protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
AsyncRestClient consumer = getMessageRouterConsumer();
ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
@@ -142,7 +179,7 @@ public class DmaapMessageConsumer {
}
}
- private void processMsg(String msg) {
+ private void processMsg(DmaapRequestMessage msg) {
logger.debug("Message Reveived from DMAAP : {}", msg);
getDmaapMessageHandler().handleDmaapMsg(msg);
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
index 040d8b3e..2d7b5063 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
@@ -40,12 +40,12 @@ import reactor.core.publisher.Mono;
/**
* The class handles incoming requests from DMAAP.
* <p>
- * That means: invoke a REST call towards this services and to send back a response though DMAAP
+ * That means: invoke a REST call towards this services and to send back a
+ * response though DMAAP
*/
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
- private static Gson gson = new GsonBuilder() //
- .create(); //
+ private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient dmaapClient;
private final AsyncRestClient pmsClient;
@@ -54,7 +54,7 @@ public class DmaapMessageHandler {
this.dmaapClient = dmaapClient;
}
- public void handleDmaapMsg(String msg) {
+ public void handleDmaapMsg(DmaapRequestMessage msg) {
try {
String result = this.createTask(msg).block();
logger.debug("handleDmaapMsg: {}", result);
@@ -63,17 +63,10 @@ public class DmaapMessageHandler {
}
}
- Mono<String> createTask(String msg) {
- try {
- DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
- return this.invokePolicyManagementService(dmaapRequestMessage) //
- .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
- .flatMap(
- response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
- } catch (Exception e) {
- String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
- return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
- }
+ Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+ return this.invokePolicyManagementService(dmaapRequestMessage) //
+ .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
+ .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
}
private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
@@ -95,6 +88,12 @@ public class DmaapMessageHandler {
.flatMap(notUsed -> Mono.empty());
}
+ public Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, HttpStatus status) {
+ return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) {
DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
String uri = dmaapRequestMessage.url();
@@ -122,20 +121,13 @@ public class DmaapMessageHandler {
}
}
- private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
- HttpStatus status) {
- return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
- .flatMap(this::sendToDmaap) //
- .onErrorResume(this::handleResponseCallError);
- }
-
private Mono<String> sendToDmaap(String body) {
logger.debug("sendToDmaap: {} ", body);
return dmaapClient.post("", "[" + body + "]");
}
private Mono<String> handleResponseCallError(Throwable t) {
- logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
+ logger.warn("Failed to send response to DMaaP: {}", t.getMessage());
return Mono.empty();
}
@@ -152,6 +144,5 @@ public class DmaapMessageHandler {
.build();
String str = gson.toJson(dmaapResponseMessage);
return Mono.just(str);
-
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 57d6969f..248ba32d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -20,11 +20,9 @@
package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.TypeAdapterFactory;
import java.io.BufferedInputStream;
import java.io.File;
@@ -34,7 +32,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Duration;
import java.util.Properties;
-import java.util.ServiceLoader;
import javax.validation.constraints.NotNull;
@@ -239,9 +236,6 @@ public class RefreshConfigTask {
return Flux.empty();
}
- GsonBuilder gsonBuilder = new GsonBuilder();
- ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
-
try (InputStream inputStream = createInputStream(filepath)) {
JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
ApplicationConfigParser appParser = new ApplicationConfigParser();
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java
index 08e99847..1e9695c7 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1ClientTest.java
@@ -20,7 +20,6 @@
package org.onap.ccsdk.oran.a1policymanagementservice.clients;
-import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
index cc9efa07..2bd8c7f4 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
@@ -22,6 +22,9 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -34,8 +37,13 @@ import static org.mockito.Mockito.when;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+
import java.time.Duration;
import java.util.LinkedList;
+import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -46,6 +54,8 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -62,6 +72,8 @@ class DmaapMessageConsumerTest {
private DmaapMessageConsumer messageConsumerUnderTest;
+ private Gson gson = new GsonBuilder().create();
+
@AfterEach
void resetLogging() {
LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
@@ -147,17 +159,7 @@ class DmaapMessageConsumerTest {
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- String message = "{\"apiVersion\":\"1.0\"," //
- + "\"operation\":\"GET\"," //
- + "\"correlationId\":\"1592341013115594000\"," //
- + "\"originatorId\":\"849e6c6b420\"," //
- + "\"payload\":{}," //
- + "\"requestId\":\"23343221\", " //
- + "\"target\":\"policy-management-service\"," //
- + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
- + "\"type\":\"request\"," //
- + "\"url\":\"/rics\"}";
- String messages = "[" + message + "]";
+ String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT)));
doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
@@ -169,37 +171,84 @@ class DmaapMessageConsumerTest {
messageConsumerUnderTest.start().join();
- ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class);
verify(messageHandlerMock).handleDmaapMsg(captor.capture());
- String messageAfterJsonParsing = captor.getValue();
- assertThat(messageAfterJsonParsing).contains("apiVersion");
+ DmaapRequestMessage messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty();
verifyNoMoreInteractions(messageHandlerMock);
}
@Test
- void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
- // The message from MR is here an array of String (which is the case when the MR
- // simulator is used)
- setUpMrConfig();
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+ void testMessageParsing() throws ServiceException {
+ messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
+ String json = gson.toJson(dmaapRequestMessage(Operation.PUT));
+ {
+ String jsonArrayOfObject = jsonArray(json);
+ List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject);
+ assertNotNull(parsedMessage);
+ assertTrue(parsedMessage.get(0).payload().isPresent());
+ }
+ {
+ String jsonArrayOfString = jsonArray(quote(json));
+ List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString);
+ assertNotNull(parsedMessage);
+ assertTrue(parsedMessage.get(0).payload().isPresent());
+ }
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
- when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+ }
+ @Test
+ void incomingUnparsableRequest_thenSendResponse() throws Exception {
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+ doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any());
+ Exception actualException =
+ assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
+ assertThat(actualException.getMessage())
+ .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException");
- messageConsumerUnderTest.start().join();
+ verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
+ }
- verify(messageHandlerMock).handleDmaapMsg("aMessage");
- verifyNoMoreInteractions(messageHandlerMock);
+ @Test
+ void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception {
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+ doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(),
+ any(), any());
+ Exception actualException =
+ assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
+ assertThat(actualException.getMessage()).contains("Sending response failed");
+
+ verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
}
private void setUpMrConfig() {
when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
}
+
+ private String jsonArray(String s) {
+ return "[" + s + "]";
+ }
+
+ private String quote(String s) {
+ return "\"" + s.replace("\"", "\\\"") + "\"";
+ }
+
+ private DmaapRequestMessage dmaapRequestMessage(Operation operation) {
+ return ImmutableDmaapRequestMessage.builder() //
+ .apiVersion("apiVersion") //
+ .correlationId("correlationId") //
+ .operation(operation) //
+ .originatorId("originatorId") //
+ .payload(new JsonObject()) //
+ .requestId("requestId") //
+ .target("target") //
+ .timestamp("timestamp") //
+ .url("URL") //
+ .build();
+ }
+
}
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
index 2405e46c..3656ec1a 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
@@ -22,9 +22,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
import static ch.qos.logback.classic.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -66,19 +63,18 @@ class DmaapMessageHandlerTest {
private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
private final AsyncRestClient pmsClient = mock(AsyncRestClient.class);
private DmaapMessageHandler testedObject;
- private static Gson gson = new GsonBuilder() //
- .create(); //
+ private Gson gson = new GsonBuilder().create(); //
@BeforeEach
private void setUp() throws Exception {
testedObject = spy(new DmaapMessageHandler(dmaapClient, pmsClient));
}
- static JsonObject payloadAsJson() {
+ JsonObject payloadAsJson() {
return gson.fromJson(payloadAsString(), JsonObject.class);
}
- static String payloadAsString() {
+ String payloadAsString() {
return "{\"param\":\"value\"}";
}
@@ -99,10 +95,6 @@ class DmaapMessageHandlerTest {
.build();
}
- private String dmaapInputMessage(Operation operation) {
- return gson.toJson(dmaapRequestMessage(operation));
- }
-
private Mono<ResponseEntity<String>> okResponse() {
ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
return Mono.just(entity);
@@ -114,39 +106,11 @@ class DmaapMessageHandlerTest {
}
@Test
- void testMessageParsing() {
- String message = dmaapInputMessage(Operation.DELETE);
- logger.info(message);
- DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
- assertNotNull(parsedMessage);
- assertFalse(parsedMessage.payload().isPresent());
-
- message = dmaapInputMessage(Operation.PUT);
- logger.info(message);
- parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
- assertNotNull(parsedMessage);
- assertTrue(parsedMessage.payload().isPresent());
- }
-
- @Test
- void unparseableMessage_thenWarning() {
- final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
-
- String msg = "bad message";
- testedObject.handleDmaapMsg(msg);
-
- assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
- "handleDmaapMsg failure org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException: Received unparsable "
- + "message from DMAAP: \"" + msg + "\", reason: ");
- }
-
- @Test
void successfulDelete() throws IOException {
doReturn(okResponse()).when(pmsClient).deleteForEntity(anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
- String message = dmaapInputMessage(Operation.DELETE);
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
StepVerifier //
.create(testedObject.createTask(message)) //
@@ -167,8 +131,9 @@ class DmaapMessageHandlerTest {
doReturn(okResponse()).when(pmsClient).getForEntity(anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .create(testedObject.createTask(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -188,8 +153,9 @@ class DmaapMessageHandlerTest {
doReturn(Mono.error(webClientResponseException)).when(pmsClient).getForEntity(anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .create(testedObject.createTask(message)) //
.expectSubscription() //
.verifyComplete(); //
@@ -205,8 +171,9 @@ class DmaapMessageHandlerTest {
doReturn(okResponse()).when(pmsClient).putForEntity(anyString(), anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+ .create(testedObject.createTask(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -223,8 +190,9 @@ class DmaapMessageHandlerTest {
doReturn(okResponse()).when(pmsClient).postForEntity(anyString(), anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+ .create(testedObject.createTask(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -237,12 +205,13 @@ class DmaapMessageHandlerTest {
}
@Test
- void exceptionWhenCallingPms_thenNotFoundResponse() throws IOException {
+ void exceptionWhenCallingPms_thenErrorResponse() throws IOException {
doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
- testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
+ DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
+ testedObject.createTask(message).block();
verify(pmsClient).putForEntity(anyString(), anyString());
verifyNoMoreInteractions(pmsClient);
@@ -257,24 +226,18 @@ class DmaapMessageHandlerTest {
}
@Test
- void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
- String message = dmaapInputMessage(Operation.PUT).toString();
- String badOperation = "BAD";
- message = message.replace(Operation.PUT.toString(), badOperation);
-
- testedObject.handleDmaapMsg(message);
-
- ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
- verify(dmaapClient).post(anyString(), captor.capture());
- String actualMessage = captor.getValue();
- assertThat(actualMessage).contains("Not implemented operation") //
- .contains("BAD_REQUEST");
- }
-
- @Test
void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
- String message = dmaapInputMessage(Operation.PUT).toString();
- message = message.replace("payload", "junk");
+ DmaapRequestMessage message = ImmutableDmaapRequestMessage.builder() //
+ .apiVersion("apiVersion") //
+ .correlationId("correlationId") //
+ .operation(DmaapRequestMessage.Operation.PUT) //
+ .originatorId("originatorId") //
+ .payload(Optional.empty()) //
+ .requestId("requestId") //
+ .target("target") //
+ .timestamp("timestamp") //
+ .url(URL) //
+ .build();
final ListAppender<ILoggingEvent> logAppender =
LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
@@ -284,4 +247,5 @@ class DmaapMessageHandlerTest {
assertThat(logAppender.list.get(0).getFormattedMessage())
.startsWith("Expected payload in message from DMAAP: ");
}
+
}