summaryrefslogtreecommitdiffstats
path: root/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java')
-rw-r--r--dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java48
1 files changed, 29 insertions, 19 deletions
diff --git a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java
index fe8f7ed..0396005 100644
--- a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java
+++ b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/dmaap/MrMessageSplitter.java
@@ -24,11 +24,9 @@ import static org.apache.commons.text.StringEscapeUtils.unescapeJson;
import static org.onap.dcae.analytics.model.AnalyticsHttpConstants.REQUEST_ID_HEADER_KEY;
import static org.onap.dcae.analytics.model.AnalyticsModelConstants.ANALYTICS_REQUEST_ID_DELIMITER;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import java.io.IOException;
import java.util.Collections;
+import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
@@ -36,15 +34,22 @@ import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.onap.dcae.analytics.model.AnalyticsHttpConstants;
import org.onap.dcae.analytics.model.DmaapMrConstants;
-import org.onap.dcae.analytics.web.exception.AnalyticsParsingException;
+import org.onap.dcae.analytics.tca.core.exception.AnalyticsParsingException;
+import org.onap.dcae.analytics.tca.core.util.LogSpec;
import org.onap.dcae.analytics.web.util.AnalyticsHttpUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.onap.dcae.utils.eelf.logger.api.log.EELFLogFactory;
+import org.onap.dcae.utils.eelf.logger.api.log.EELFLogger;
+import org.onap.dcae.utils.eelf.logger.api.spec.AuditLogSpec;
+import org.onap.dcae.utils.eelf.logger.api.spec.ErrorLogSpec;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
/**
* DMaaP MR message splitter split the incoming messages into batch of given batch size
*
@@ -52,7 +57,7 @@ import org.springframework.messaging.Message;
*/
public class MrMessageSplitter extends AbstractMessageSplitter {
- private static final Logger logger = LoggerFactory.getLogger(MrMessageSplitter.class);
+ private static final EELFLogger eelfLogger = EELFLogFactory.getLogger(MrMessageSplitter.class);
private final ObjectMapper objectMapper;
private final Integer batchSize;
@@ -66,18 +71,23 @@ public class MrMessageSplitter extends AbstractMessageSplitter {
@Override
protected Object splitMessage(final Message<?> message) {
- final List<String> dmaapMessages = convertJsonToStringMessages(String.class.cast(message.getPayload()).trim());
-
final String requestId = AnalyticsHttpUtils.getRequestId(message.getHeaders());
+ final List<String> dmaapMessages = convertJsonToStringMessages(requestId, String.class.cast(message.getPayload()).trim());
+
final String transactionId = AnalyticsHttpUtils.getTransactionId(message.getHeaders());
- logger.info("Request Id: {}, Transaction Id: {}, Received new messages from DMaaP MR. Count: {}",
- requestId, transactionId, dmaapMessages.size());
+ final Date requestBeginTimestamp = AnalyticsHttpUtils.getTimestampFromHeaders(message.getHeaders(),
+ AnalyticsHttpConstants.REQUEST_BEGIN_TS_HEADER_KEY);
+ final AuditLogSpec auditLogSpec = LogSpec.createAuditLogSpec(requestId, requestBeginTimestamp);
- final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize);
+ eelfLogger.auditLog().info("Request Id: {}, Transaction Id: {}, dmaapMessages: {},"
+ + " Received new messages from DMaaP MR. Count: {}",
+ auditLogSpec, requestId, transactionId, dmaapMessages.toString(), String.valueOf(dmaapMessages.size()));
- logger.debug("Request Id: {}, Transaction Id: {}, Max allowed messages per batch: {}. " +
- "No of batches created: {}", requestId, transactionId, batchSize, messagePartitions.size());
+ final List<List<String>> messagePartitions = partition(dmaapMessages, batchSize);
+ eelfLogger.auditLog().info("Request Id: {}, Transaction Id: {}, Max allowed messages per batch: {}. " +
+ "No of batches created: {}",
+ auditLogSpec, requestId, transactionId, String.valueOf(batchSize), String.valueOf(messagePartitions.size()));
// append batch id to request id header
return messagePartitions.isEmpty() ? null : IntStream.range(0, messagePartitions.size())
@@ -100,7 +110,7 @@ public class MrMessageSplitter extends AbstractMessageSplitter {
*
* @return List containing DMaaP MR Messages
*/
- private List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+ private List<String> convertJsonToStringMessages(String requestId, @Nullable final String messagesJsonString) {
final LinkedList<String> messages = new LinkedList<>();
@@ -128,9 +138,11 @@ public class MrMessageSplitter extends AbstractMessageSplitter {
}
} catch (IOException e) {
- final String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " +
+ ErrorLogSpec errorLogSpec = LogSpec.createErrorLogSpec(requestId);
+ eelfLogger.errorLog().error("Unable to convert subscriber Json String to Messages. " +
+ "Subscriber Response String: {}, Json Error: {}", errorLogSpec, messagesJsonString, e.toString());
+ String errorMessage = String.format("Unable to convert subscriber Json String to Messages. " +
"Subscriber Response String: %s, Json Error: %s", messagesJsonString, e);
- logger.error(errorMessage, e);
throw new AnalyticsParsingException(errorMessage, e);
}
@@ -183,6 +195,4 @@ public class MrMessageSplitter extends AbstractMessageSplitter {
}
return result;
}
-
-
}