summaryrefslogtreecommitdiffstats
path: root/dcae-analytics-dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-dmaap')
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java38
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java2
-rw-r--r--dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java6
3 files changed, 34 insertions, 12 deletions
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java
index 72f859c..58b1a1f 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java
@@ -327,17 +327,24 @@ public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
// If message string is not null or not empty parse json message array to List of string messages
if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
- && !"[]".equals(messagesJsonString.trim())) {
+ && !("[]").equals(messagesJsonString.trim())) {
try {
- final List messageList = objectMapper.readValue(messagesJsonString, List.class);
- for (Object message : messageList) {
- final String jsonMessageString = objectMapper.writeValueAsString(message);
- if (jsonMessageString.startsWith("\"") && jsonMessageString.endsWith("\"")) {
- final String jsonSubString = jsonMessageString.substring(1, jsonMessageString.length() - 1);
- messages.add(StringEscapeUtils.unescapeJson(jsonSubString));
+ // get root node
+ final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+ // iterate over root node and parse arrays messages
+ for (JsonNode jsonNode : rootNode) {
+ // if array parse it is array of messages
+ final String incomingMessageString = jsonNode.toString();
+ if (jsonNode.isArray()) {
+ final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+ for (Object message : messageList) {
+ final String jsonMessageString = objectMapper.writeValueAsString(message);
+ addUnescapedJsonToMessage(messages, jsonMessageString);
+ }
} else {
- messages.add(StringEscapeUtils.unescapeJson(jsonMessageString));
+ // parse it as object
+ addUnescapedJsonToMessage(messages, incomingMessageString);
}
}
@@ -352,5 +359,20 @@ public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
return messages;
}
+ /**
+ * Adds unescaped Json messages to given messages list
+ *
+ * @param messages message list in which unescaped messages will be added
+ * @param incomingMessageString incoming message string that may need to be escaped
+ */
+ private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+ if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+ messages.add(StringEscapeUtils.unescapeJson(
+ incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+ } else {
+ messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));
+ }
+ }
+
}
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java
index a67e777..557742f 100644
--- a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImplTest.java
@@ -33,7 +33,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.MockitoJUnitRunner;
import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
diff --git a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java
index 381fa70..c68447f 100644
--- a/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java
+++ b/dcae-analytics-dmaap/src/test/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImplTest.java
@@ -33,7 +33,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.junit.MockitoJUnitRunner;
import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
@@ -55,7 +55,7 @@ import static org.mockito.BDDMockito.given;
public class DMaaPMRSubscriberImplTest extends BaseAnalyticsDMaaPUnitTest {
@Mock
- CloseableHttpClient closeableHttpClient;
+ private CloseableHttpClient closeableHttpClient;
private String consumerGroup, consumerId;
@@ -152,7 +152,7 @@ public class DMaaPMRSubscriberImplTest extends BaseAnalyticsDMaaPUnitTest {
DMaaPMRSubscriberImpl dmaapMRSubscriberImpl = new DMaaPMRSubscriberImpl(
getSubscriberConfig(consumerId, consumerGroup), closeableHttpClient);
- DMaaPMRSubscriberResponse dmaapMRSubscriberResponse = dmaapMRSubscriberImpl.fetchMessages();
+ dmaapMRSubscriberImpl.fetchMessages();
}
}