diff options
Diffstat (limited to 'dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service')
11 files changed, 1251 insertions, 1251 deletions
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java index 58b1a1f..22f25d1 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/BaseDMaaPMRComponent.java @@ -1,378 +1,378 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.lang3.StringEscapeUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.client.ResponseHandler; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.util.EntityUtils; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl; -import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.util.LinkedList; -import java.util.List; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import static java.lang.String.format; - -/** - * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods - * - * @author Rajiv Singla . Creation Date: 11/1/2016. - */ -public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent { - - private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class); - - private static final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Creates Base64 encoded Auth Header for given userName and Password - * If either user name of password are null return absent - * - * @param userName username - * @param userPassword user password - * @return base64 encoded auth header if username or password are both non null - */ - protected static Optional<String> getAuthHeader(@Nullable final String userName, - @Nullable final String userPassword) { - if (userName == null || userPassword == null) { - return Optional.absent(); - } else { - final String auth = userName + ":" + userPassword; - final Charset isoCharset = Charset.forName("ISO-8859-1"); - byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset)); - return Optional.of("Basic " + new String(encodedAuth, isoCharset)); - } - } - - - /** - * Creates Publisher URI for given {@link DMaaPMRPublisherConfig} - * - * @param publisherConfig publisher settings - * - * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic - */ - protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) { - final String hostName = publisherConfig.getHostName(); - final Integer portNumber = publisherConfig.getPortNumber(); - final String getProtocol = publisherConfig.getProtocol(); - final String topicName = publisherConfig.getTopicName(); - URI publisherURI = null; - try { - publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) - .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build(); - } catch (URISyntaxException e) { - final String errorMessage = format("Error while creating publisher URI: %s", e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI); - return publisherURI; - } - - - /** - * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig} - * - * @param subscriberConfig subscriber settings - * - * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic - */ - protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) { - final String hostName = subscriberConfig.getHostName(); - final Integer portNumber = subscriberConfig.getPortNumber(); - final String getProtocol = subscriberConfig.getProtocol(); - final String topicName = subscriberConfig.getTopicName(); - final String consumerId = subscriberConfig.getConsumerId(); - final String consumerGroup = subscriberConfig.getConsumerGroup(); - final Integer timeoutMS = subscriberConfig.getTimeoutMS(); - final Integer messageLimit = subscriberConfig.getMessageLimit(); - URI subscriberURI = null; - try { - URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber) - .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX - + topicName + "/" - + consumerGroup + "/" + - consumerId); - // add query params if present - if (timeoutMS > 0) { - uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString()); - } - if (messageLimit > 0) { - uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME, - messageLimit.toString()); - } - subscriberURI = uriBuilder.build(); - - } catch (URISyntaxException e) { - final String errorMessage = format("Error while creating subscriber URI: %s", e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI); - return subscriberURI; - } - - - /** - * Creates 202 (Accepted) Response code message - * - * @param batchQueueSize batch Queue size - * - * @return response with 202 message code - */ - protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) { - return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE, - "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize); - } - - - /** - * Creates 204 (No Content) Response code message - * - * @return response with 204 message code - */ - protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() { - return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE, - "No Content - No Messages in batch queue for flushing to MR Topic", 0); - } - - - /** - * Creates Publisher Response for given response code, response Message and pending Message Count - * - * @param responseCode HTTP Status Code - * @param responseMessage response message - * @param pendingMessages pending messages in batch queue - * - * @return DMaaP MR Publisher Response - */ - protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String - responseMessage, int pendingMessages) { - return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages); - } - - - /** - * Returns weekly consistent pending messages in batch queue - * - * @param publisherQueue batch queue - * @param publisherConfig publisher settings - * - * @return pending messages to be published - */ - protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue, - @Nonnull final DMaaPMRPublisherConfig publisherConfig) { - return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize(); - } - - - /** - * Creates Subscriber Response for give response Code, response Message and fetch messages - * - * @param responseCode response Code - * @param responseMessage response Message - * @param fetchedMessages fetched messages - * - * @return DMaaP MR Subscriber Response - */ - protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String - responseMessage, List<String> fetchedMessages) { - if (fetchedMessages == null) { - return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage); - } else { - return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages); - } - } - - - /** - * Custom response handler which extract status code and response body - * - * @return Pair containing Response code and response body - */ - protected static ResponseHandler<Pair<Integer, String>> responseHandler() { - return new ResponseHandler<Pair<Integer, String>>() { - @Override - public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException { - // Get Response status code - final int status = response.getStatusLine().getStatusCode(); - final HttpEntity responseEntity = response.getEntity(); - // If response entity is not null - extract response body as string - String responseEntityString = ""; - if (responseEntity != null) { - responseEntityString = EntityUtils.toString(responseEntity); - } - return new ImmutablePair<>(status, responseEntityString); - } - }; - } - - - /** - * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will - * be lost - * - * @param publisherQueue publisher queue - * @param messages recoverable messages to be published to recovery queue - */ - protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue, - List<String> messages) { - try { - publisherQueue.addRecoverableMessages(messages); - - LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}", - messages.size(), publisherQueue.getBatchQueueRemainingSize()); - - } catch (IllegalStateException e) { - final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " + - "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d", - messages.size(), publisherQueue.getRecoveryQueueRemainingSize()); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - - - /** - * Converts List of messages to Json String Array which can be published to DMaaP MR topic. - * - * @param messages messages that need to parsed to Json Array representation - * @return json string representation of message - */ - protected static String convertToJsonString(@Nullable final List<String> messages) { - // If messages are null or empty just return empty array - if (messages == null || messages.isEmpty()) { - return "[]"; - } - - - List<JsonNode> jsonMessageObjectsList = new LinkedList<>(); - - try { - for (String message : messages) { - final JsonNode jsonNode = objectMapper.readTree(message); - jsonMessageObjectsList.add(jsonNode); - } - return objectMapper.writeValueAsString(jsonMessageObjectsList); - } catch (JsonProcessingException e) { - final String errorMessage = - format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s", - messages, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - - } catch (IOException e) { - final String errorMessage = - format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s", - messages, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - } - - - /** - * Converts subscriber messages json string to List of messages. If message Json String is empty - * or null - * - * @param messagesJsonString json messages String - * - * @return List containing DMaaP MR Messages - */ - protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) { - - final LinkedList<String> messages = new LinkedList<>(); - - // If message string is not null or not empty parse json message array to List of string messages - if (messagesJsonString != null && !messagesJsonString.trim().isEmpty() - && !("[]").equals(messagesJsonString.trim())) { - - try { - // get root node - final JsonNode rootNode = objectMapper.readTree(messagesJsonString); - // iterate over root node and parse arrays messages - for (JsonNode jsonNode : rootNode) { - // if array parse it is array of messages - final String incomingMessageString = jsonNode.toString(); - if (jsonNode.isArray()) { - final List messageList = objectMapper.readValue(incomingMessageString, List.class); - for (Object message : messageList) { - final String jsonMessageString = objectMapper.writeValueAsString(message); - addUnescapedJsonToMessage(messages, jsonMessageString); - } - } else { - // parse it as object - addUnescapedJsonToMessage(messages, incomingMessageString); - } - } - - } catch (IOException e) { - final String errorMessage = - format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," + - " Json Error: %s", messagesJsonString, e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - } - return messages; - } - - /** - * Adds unescaped Json messages to given messages list - * - * @param messages message list in which unescaped messages will be added - * @param incomingMessageString incoming message string that may need to be escaped - */ - private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) { - if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) { - messages.add(StringEscapeUtils.unescapeJson( - incomingMessageString.substring(1, incomingMessageString.length() - 1))); - } else { - messages.add(StringEscapeUtils.unescapeJson(incomingMessageString)); - } - } - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.util.EntityUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponseImpl;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponseImpl;
+import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisherQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static java.lang.String.format;
+
+/**
+ * Base class for DMaaP MR Publishers and Subscriber Implementations containing various utility methods
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public abstract class BaseDMaaPMRComponent implements DMaaPMRComponent {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseDMaaPMRComponent.class);
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Creates Base64 encoded Auth Header for given userName and Password
+ * If either user name of password are null return absent
+ *
+ * @param userName username
+ * @param userPassword user password
+ * @return base64 encoded auth header if username or password are both non null
+ */
+ protected static Optional<String> getAuthHeader(@Nullable final String userName,
+ @Nullable final String userPassword) {
+ if (userName == null || userPassword == null) {
+ return Optional.absent();
+ } else {
+ final String auth = userName + ":" + userPassword;
+ final Charset isoCharset = Charset.forName("ISO-8859-1");
+ byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(isoCharset));
+ return Optional.of("Basic " + new String(encodedAuth, isoCharset));
+ }
+ }
+
+
+ /**
+ * Creates Publisher URI for given {@link DMaaPMRPublisherConfig}
+ *
+ * @param publisherConfig publisher settings
+ *
+ * @return DMaaP MR Publisher Topic URI that can be used to post messages to MR Topic
+ */
+ protected static URI createPublisherURI(final DMaaPMRPublisherConfig publisherConfig) {
+ final String hostName = publisherConfig.getHostName();
+ final Integer portNumber = publisherConfig.getPortNumber();
+ final String getProtocol = publisherConfig.getProtocol();
+ final String topicName = publisherConfig.getTopicName();
+ URI publisherURI = null;
+ try {
+ publisherURI = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX + topicName).build();
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating publisher URI: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ LOG.info("Created DMaaP MR Publisher URI: {}", publisherURI);
+ return publisherURI;
+ }
+
+
+ /**
+ * Creates Subscriber URI for given {@link DMaaPMRSubscriberConfig}
+ *
+ * @param subscriberConfig subscriber settings
+ *
+ * @return DMaaP MR Subscriber Topic URI that can be used to fetch messages from MR topic
+ */
+ protected static URI createSubscriberURI(final DMaaPMRSubscriberConfig subscriberConfig) {
+ final String hostName = subscriberConfig.getHostName();
+ final Integer portNumber = subscriberConfig.getPortNumber();
+ final String getProtocol = subscriberConfig.getProtocol();
+ final String topicName = subscriberConfig.getTopicName();
+ final String consumerId = subscriberConfig.getConsumerId();
+ final String consumerGroup = subscriberConfig.getConsumerGroup();
+ final Integer timeoutMS = subscriberConfig.getTimeoutMS();
+ final Integer messageLimit = subscriberConfig.getMessageLimit();
+ URI subscriberURI = null;
+ try {
+ URIBuilder uriBuilder = new URIBuilder().setScheme(getProtocol).setHost(hostName).setPort(portNumber)
+ .setPath(AnalyticsConstants.DMAAP_URI_PATH_PREFIX
+ + topicName + "/"
+ + consumerGroup + "/" +
+ consumerId);
+ // add query params if present
+ if (timeoutMS > 0) {
+ uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_TIMEOUT_QUERY_PARAM_NAME, timeoutMS.toString());
+ }
+ if (messageLimit > 0) {
+ uriBuilder.addParameter(AnalyticsConstants.SUBSCRIBER_MSG_LIMIT_QUERY_PARAM_NAME,
+ messageLimit.toString());
+ }
+ subscriberURI = uriBuilder.build();
+
+ } catch (URISyntaxException e) {
+ final String errorMessage = format("Error while creating subscriber URI: %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ LOG.info("Created DMaaP MR Subscriber URI: {}", subscriberURI);
+ return subscriberURI;
+ }
+
+
+ /**
+ * Creates 202 (Accepted) Response code message
+ *
+ * @param batchQueueSize batch Queue size
+ *
+ * @return response with 202 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherAcceptedResponse(int batchQueueSize) {
+ return createPublisherResponse(HTTPUtils.HTTP_ACCEPTED_RESPONSE_CODE,
+ "Accepted - Messages queued for batch publishing to MR Topic", batchQueueSize);
+ }
+
+
+ /**
+ * Creates 204 (No Content) Response code message
+ *
+ * @return response with 204 message code
+ */
+ protected static DMaaPMRPublisherResponse createPublisherNoContentResponse() {
+ return createPublisherResponse(HTTPUtils.HTTP_NO_CONTENT_RESPONSE_CODE,
+ "No Content - No Messages in batch queue for flushing to MR Topic", 0);
+ }
+
+
+ /**
+ * Creates Publisher Response for given response code, response Message and pending Message Count
+ *
+ * @param responseCode HTTP Status Code
+ * @param responseMessage response message
+ * @param pendingMessages pending messages in batch queue
+ *
+ * @return DMaaP MR Publisher Response
+ */
+ protected static DMaaPMRPublisherResponse createPublisherResponse(int responseCode, String
+ responseMessage, int pendingMessages) {
+ return new DMaaPMRPublisherResponseImpl(responseCode, responseMessage, pendingMessages);
+ }
+
+
+ /**
+ * Returns weekly consistent pending messages in batch queue
+ *
+ * @param publisherQueue batch queue
+ * @param publisherConfig publisher settings
+ *
+ * @return pending messages to be published
+ */
+ protected static int getPendingMessages(@Nonnull final DMaaPMRPublisherQueue publisherQueue,
+ @Nonnull final DMaaPMRPublisherConfig publisherConfig) {
+ return publisherConfig.getMaxBatchSize() - publisherQueue.getBatchQueueRemainingSize();
+ }
+
+
+ /**
+ * Creates Subscriber Response for give response Code, response Message and fetch messages
+ *
+ * @param responseCode response Code
+ * @param responseMessage response Message
+ * @param fetchedMessages fetched messages
+ *
+ * @return DMaaP MR Subscriber Response
+ */
+ protected static DMaaPMRSubscriberResponse createSubscriberResponse(int responseCode, String
+ responseMessage, List<String> fetchedMessages) {
+ if (fetchedMessages == null) {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage);
+ } else {
+ return new DMaaPMRSubscriberResponseImpl(responseCode, responseMessage, fetchedMessages);
+ }
+ }
+
+
+ /**
+ * Custom response handler which extract status code and response body
+ *
+ * @return Pair containing Response code and response body
+ */
+ protected static ResponseHandler<Pair<Integer, String>> responseHandler() {
+ return new ResponseHandler<Pair<Integer, String>>() {
+ @Override
+ public Pair<Integer, String> handleResponse(HttpResponse response) throws IOException {
+ // Get Response status code
+ final int status = response.getStatusLine().getStatusCode();
+ final HttpEntity responseEntity = response.getEntity();
+ // If response entity is not null - extract response body as string
+ String responseEntityString = "";
+ if (responseEntity != null) {
+ responseEntityString = EntityUtils.toString(responseEntity);
+ }
+ return new ImmutablePair<>(status, responseEntityString);
+ }
+ };
+ }
+
+
+ /**
+ * Adds message to Publisher recovery queue. If recovery queue is full throws an error as messages will
+ * be lost
+ *
+ * @param publisherQueue publisher queue
+ * @param messages recoverable messages to be published to recovery queue
+ */
+ protected static void addMessagesToRecoveryQueue(DMaaPMRPublisherQueue publisherQueue,
+ List<String> messages) {
+ try {
+ publisherQueue.addRecoverableMessages(messages);
+
+ LOG.debug("Messages Added to Recovery Queue. Messages Size: {}, Recovery Queue Remaining Size: {}",
+ messages.size(), publisherQueue.getBatchQueueRemainingSize());
+
+ } catch (IllegalStateException e) {
+ final String errorMessage = format("Unable to put messages in recovery queue. Messages will be lost. " +
+ "Recovery Queue might be full. Message Size: %d, Recovery Queue Remaining Capacity: %d",
+ messages.size(), publisherQueue.getRecoveryQueueRemainingSize());
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts List of messages to Json String Array which can be published to DMaaP MR topic.
+ *
+ * @param messages messages that need to parsed to Json Array representation
+ * @return json string representation of message
+ */
+ protected static String convertToJsonString(@Nullable final List<String> messages) {
+ // If messages are null or empty just return empty array
+ if (messages == null || messages.isEmpty()) {
+ return "[]";
+ }
+
+
+ List<JsonNode> jsonMessageObjectsList = new LinkedList<>();
+
+ try {
+ for (String message : messages) {
+ final JsonNode jsonNode = objectMapper.readTree(message);
+ jsonMessageObjectsList.add(jsonNode);
+ }
+ return objectMapper.writeValueAsString(jsonMessageObjectsList);
+ } catch (JsonProcessingException e) {
+ final String errorMessage =
+ format("Unable to convert publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("IO Exception while converting publisher messages to Json. Messages: %s, Json Error: %s",
+ messages, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+
+ /**
+ * Converts subscriber messages json string to List of messages. If message Json String is empty
+ * or null
+ *
+ * @param messagesJsonString json messages String
+ *
+ * @return List containing DMaaP MR Messages
+ */
+ protected static List<String> convertJsonToStringMessages(@Nullable final String messagesJsonString) {
+
+ final LinkedList<String> messages = new LinkedList<>();
+
+ // If message string is not null or not empty parse json message array to List of string messages
+ if (messagesJsonString != null && !messagesJsonString.trim().isEmpty()
+ && !("[]").equals(messagesJsonString.trim())) {
+
+ try {
+ // get root node
+ final JsonNode rootNode = objectMapper.readTree(messagesJsonString);
+ // iterate over root node and parse arrays messages
+ for (JsonNode jsonNode : rootNode) {
+ // if array parse it is array of messages
+ final String incomingMessageString = jsonNode.toString();
+ if (jsonNode.isArray()) {
+ final List messageList = objectMapper.readValue(incomingMessageString, List.class);
+ for (Object message : messageList) {
+ final String jsonMessageString = objectMapper.writeValueAsString(message);
+ addUnescapedJsonToMessage(messages, jsonMessageString);
+ }
+ } else {
+ // parse it as object
+ addUnescapedJsonToMessage(messages, incomingMessageString);
+ }
+ }
+
+ } catch (IOException e) {
+ final String errorMessage =
+ format("Unable to convert subscriber Json String to Messages. Subscriber Response String: %s," +
+ " Json Error: %s", messagesJsonString, e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+ return messages;
+ }
+
+ /**
+ * Adds unescaped Json messages to given messages list
+ *
+ * @param messages message list in which unescaped messages will be added
+ * @param incomingMessageString incoming message string that may need to be escaped
+ */
+ private static void addUnescapedJsonToMessage(List<String> messages, String incomingMessageString) {
+ if (incomingMessageString.startsWith("\"") && incomingMessageString.endsWith("\"")) {
+ messages.add(StringEscapeUtils.unescapeJson(
+ incomingMessageString.substring(1, incomingMessageString.length() - 1)));
+ } else {
+ messages.add(StringEscapeUtils.unescapeJson(incomingMessageString));
+ }
+ }
+
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/DMaaPMRComponent.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/DMaaPMRComponent.java index 30c244b..5c0df09 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/DMaaPMRComponent.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/DMaaPMRComponent.java @@ -1,29 +1,29 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service; - -/** - * Marker interface for all DMaaP MR Components e.g. MR Publishers, MR Subscribers - * - * @author Rajiv Singla . Creation Date: 11/1/2016. - */ -public interface DMaaPMRComponent { -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service;
+
+/**
+ * Marker interface for all DMaaP MR Components e.g. MR Publishers, MR Subscribers
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRComponent {
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java index 1d37786..48b0a70 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java @@ -1,95 +1,95 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; - -import java.util.Date; -import java.util.List; - -/** - * <p> - * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics. - * <p> - * - * @author Rajiv Singla . Creation Date: 10/13/2016. - */ -public interface DMaaPMRPublisher extends AutoCloseable { - - - /** - * <p> - * Adds collection of messages to DMaaP MR Topic Publishing Queue. - * <p> - * Note: Invoking this method may or may not cause publishing immediately - * as publishing in done is batch mode by default. Parameter maxBatchSize - * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size. - * If the maxBatchSize is reached all message will be published automatically - * during subsequent call. - * </p> - * - * @param messages messages to publish to DMaaP MR Publisher - * @return response which may contain Http Response code 202 (Accepted) as publishing - * will proceed when max batch size is reached. Throws {@link DCAEAnalyticsRuntimeException} - * if publishing fails - */ - DMaaPMRPublisherResponse publish(List<String> messages); - - - /** - * <p> - * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse} - * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic. - * </p> - * - * @param messages messages to publish to DMaaP MR Publisher - * @return DMaaP Message Router Publisher Response. Throws {@link DCAEAnalyticsRuntimeException} - * if force publishing fails - * - */ - DMaaPMRPublisherResponse forcePublish(List<String> messages); - - - /** - * <p> - * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns - * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to - * be flushed response code 304 (Not Modified) will be returned - * </p> - * - * @return DMaaP Message Router Publisher Response - */ - DMaaPMRPublisherResponse flush(); - - - /** - * <p> - * Returns the creation time when Publisher instance was created. - * <p> - * - * @return creation time of Subscriber instance - */ - Date getPublisherCreationTime(); - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisher extends AutoCloseable {
+
+
+ /**
+ * <p>
+ * Adds collection of messages to DMaaP MR Topic Publishing Queue.
+ * <p>
+ * Note: Invoking this method may or may not cause publishing immediately
+ * as publishing in done is batch mode by default. Parameter maxBatchSize
+ * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size.
+ * If the maxBatchSize is reached all message will be published automatically
+ * during subsequent call.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return response which may contain Http Response code 202 (Accepted) as publishing
+ * will proceed when max batch size is reached. Throws {@link DCAEAnalyticsRuntimeException}
+ * if publishing fails
+ */
+ DMaaPMRPublisherResponse publish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse}
+ * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return DMaaP Message Router Publisher Response. Throws {@link DCAEAnalyticsRuntimeException}
+ * if force publishing fails
+ *
+ */
+ DMaaPMRPublisherResponse forcePublish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns
+ * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to
+ * be flushed response code 304 (Not Modified) will be returned
+ * </p>
+ *
+ * @return DMaaP Message Router Publisher Response
+ */
+ DMaaPMRPublisherResponse flush();
+
+
+ /**
+ * <p>
+ * Returns the creation time when Publisher instance was created.
+ * <p>
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getPublisherCreationTime();
+
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java index 0e71559..5d6dfa1 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java @@ -1,49 +1,49 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; - -/** - * <p> - * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes. - * <p> - * <strong> - * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they - * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize - * guice injected Publisher instances - * </strong> - * <p> - * @author Rajiv Singla . Creation Date: 10/20/2016. - */ -public interface DMaaPMRPublisherFactory { - - /** - * Guice Factory to create DMaaP MR Publisher - * - * @param publisherConfig publisher config - * - * @return DMaaP MR Publisher instance - */ - DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig); - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Publisher instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRPublisherFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher
+ *
+ * @param publisherConfig publisher config
+ *
+ * @return DMaaP MR Publisher instance
+ */
+ DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig);
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java index 1901189..b3e303e 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java @@ -1,209 +1,209 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.http.HttpHeaders; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse; -import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.Date; -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode; -import static java.lang.String.format; - -/** - * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient} - * - * @author Rajiv Singla . Creation Date: 10/13/2016. - */ -public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class); - - private final DMaaPMRPublisherConfig publisherConfig; - private final CloseableHttpClient closeableHttpClient; - private final DMaaPMRPublisherQueue publisherQueue; - private final Date publisherCreationTime; - private URI publisherUri; - - @Inject - public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig, - DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory, - CloseableHttpClient closeableHttpClient) { - - this.publisherConfig = publisherConfig; - this.publisherQueue = dMaaPMRPublisherQueueFactory.create( - publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize()); - this.closeableHttpClient = closeableHttpClient; - this.publisherUri = createPublisherURI(publisherConfig); - this.publisherCreationTime = new Date(); - } - - - @Override - public DMaaPMRPublisherResponse publish(List<String> messages) { - - final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize(); - - // if messages size is less than batch queue size - just queue them for batch publishing - if (batchQueueRemainingSize > messages.size()) { - LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}", - messages.size(), batchQueueRemainingSize); - final int batchQueueSize = publisherQueue.addBatchMessages(messages); - return createPublisherAcceptedResponse(batchQueueSize); - - } else { - - // grab all already queued messages, append current messages and force publish them to DMaaP MR topic - final List<String> queueMessages = publisherQueue.getMessageForPublishing(); - LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " + - "Publisher Topic."); - return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages))); - } - - } - - @Override - public DMaaPMRPublisherResponse forcePublish(List<String> messages) { - - LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size()); - - final String contentType = publisherConfig.getContentType(); - final String userName = publisherConfig.getUserName(); - final String userPassword = publisherConfig.getUserPassword(); - final HttpPost postRequest = new HttpPost(publisherUri); - - // add Authorization Header if username and password are present - final Optional<String> authHeader = getAuthHeader(userName, userPassword); - if (authHeader.isPresent()) { - postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); - } else { - LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present."); - } - - // Create post string entity - final String messagesJson = convertToJsonString(messages); - final StringEntity requestEntity = - new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8")); - postRequest.setEntity(requestEntity); - - try { - final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler()); - final Integer responseCode = responsePair.getLeft(); - final String responseBody = responsePair.getRight(); - // if messages were published successfully, return successful response - if (isSuccessfulResponseCode(responseCode)) { - LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " + - "Body: {}, Number of Messages published: {}", - responseCode, responseBody, messages.size()); - - } else { - LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " + - "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody); - addMessagesToRecoveryQueue(publisherQueue, messages); - } - - return createPublisherResponse(responseCode, responseBody, - getPendingMessages(publisherQueue, publisherConfig)); - - } catch (IOException e) { - // If IO Error then we need to also put messages in recovery queue - addMessagesToRecoveryQueue(publisherQueue, messages); - final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " + - "Messages will be queued in recovery queue. Messages Size: %d", messages.size()); - - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - } - - - @Override - public DMaaPMRPublisherResponse flush() { - final List<String> queueMessages = publisherQueue.getMessageForPublishing(); - // If there are no message return 204 (No Content) response code - if (queueMessages.isEmpty()) { - LOG.debug("No messages to publish to batch queue. Returning 204 status code"); - return createPublisherNoContentResponse(); - } else { - // force publish messages in queue - return forcePublish(queueMessages); - } - } - - @Override - public Date getPublisherCreationTime() { - return new Date(publisherCreationTime.getTime()); - } - - @Override - public void close() throws Exception { - - // flush current message in the queue - int retrialNumber = 0; - int flushResponseCode; - - // automatic retries if messages cannot be flushed - do { - retrialNumber++; - DMaaPMRPublisherResponse flushResponse = flush(); - flushResponseCode = flushResponse.getResponseCode(); - - if (!isSuccessfulResponseCode(flushResponseCode)) { - LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " + - "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber, - AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE); - - Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE); - } - } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE && - !isSuccessfulResponseCode(flushResponseCode)); - - if (!isSuccessfulResponseCode(flushResponseCode)) { - LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented"); - } else { - LOG.info("Successfully published all batched messages to publisher."); - } - - // close http client - closeableHttpClient.close(); - - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
+
+ private final DMaaPMRPublisherConfig publisherConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final DMaaPMRPublisherQueue publisherQueue;
+ private final Date publisherCreationTime;
+ private URI publisherUri;
+
+ @Inject
+ public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
+ DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
+ CloseableHttpClient closeableHttpClient) {
+
+ this.publisherConfig = publisherConfig;
+ this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
+ publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());
+ this.closeableHttpClient = closeableHttpClient;
+ this.publisherUri = createPublisherURI(publisherConfig);
+ this.publisherCreationTime = new Date();
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse publish(List<String> messages) {
+
+ final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
+
+ // if messages size is less than batch queue size - just queue them for batch publishing
+ if (batchQueueRemainingSize > messages.size()) {
+ LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
+ messages.size(), batchQueueRemainingSize);
+ final int batchQueueSize = publisherQueue.addBatchMessages(messages);
+ return createPublisherAcceptedResponse(batchQueueSize);
+
+ } else {
+
+ // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
+ "Publisher Topic.");
+ return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
+ }
+
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
+
+ LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
+
+ final String contentType = publisherConfig.getContentType();
+ final String userName = publisherConfig.getUserName();
+ final String userPassword = publisherConfig.getUserPassword();
+ final HttpPost postRequest = new HttpPost(publisherUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
+ }
+
+ // Create post string entity
+ final String messagesJson = convertToJsonString(messages);
+ final StringEntity requestEntity =
+ new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
+ postRequest.setEntity(requestEntity);
+
+ try {
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+ // if messages were published successfully, return successful response
+ if (isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
+ "Body: {}, Number of Messages published: {}",
+ responseCode, responseBody, messages.size());
+
+ } else {
+ LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
+ "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ }
+
+ return createPublisherResponse(responseCode, responseBody,
+ getPendingMessages(publisherQueue, publisherConfig));
+
+ } catch (IOException e) {
+ // If IO Error then we need to also put messages in recovery queue
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
+ "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
+
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse flush() {
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ // If there are no message return 204 (No Content) response code
+ if (queueMessages.isEmpty()) {
+ LOG.debug("No messages to publish to batch queue. Returning 204 status code");
+ return createPublisherNoContentResponse();
+ } else {
+ // force publish messages in queue
+ return forcePublish(queueMessages);
+ }
+ }
+
+ @Override
+ public Date getPublisherCreationTime() {
+ return new Date(publisherCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ // flush current message in the queue
+ int retrialNumber = 0;
+ int flushResponseCode;
+
+ // automatic retries if messages cannot be flushed
+ do {
+ retrialNumber++;
+ DMaaPMRPublisherResponse flushResponse = flush();
+ flushResponseCode = flushResponse.getResponseCode();
+
+ if (!isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
+ "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
+ AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
+
+ Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
+ }
+ } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
+ !isSuccessfulResponseCode(flushResponseCode));
+
+ if (!isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
+ } else {
+ LOG.info("Successfully published all batched messages to publisher.");
+ }
+
+ // close http client
+ closeableHttpClient.close();
+
+ }
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java index 281af53..3877f0b 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java @@ -1,87 +1,87 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import java.util.List; - -/** - * <p> - * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic - * is offline for some reason. It does so by having a recovery queue which keeps - * messages in order in case there is temporary interruption in DMaaP Publisher - * </p> - * - * @author Rajiv Singla . Creation Date: 11/1/2016. - */ -public interface DMaaPMRPublisherQueue { - - /** - * <p> - * Add batchMessages to Batch Queue - * </p> - * - * @param batchMessages messages that needs to be added to batch queue - * @return current size of batch queue. Throws {@link IllegalStateException} - * if batch queue does not have enough space - */ - int addBatchMessages(List<String> batchMessages); - - - /** - * <p> - * Add recoverable messages to Recoverable Queue - * </p> - * - * @param recoverableMessages messages that needs to be added to recoverable queue - * @return current size of the recoverable queue. Throws {@link IllegalStateException} - * if recoverable queue does not have enough space - */ - int addRecoverableMessages(List<String> recoverableMessages); - - /** - * <p> - * Get messages that need to be published to DMaaP topic. Messages in recoverable - * queue are appended if present. - * </p> - * - * @return List of messages from both batch and recovery queue - */ - List<String> getMessageForPublishing(); - - /** - * <p> - * Remaining capacity of Batch Queue - * </p> - * - * @return Remaining Batch Queue Size - */ - int getBatchQueueRemainingSize(); - - /** - * <p> - * Remaining capacity of Recovery Queue - * </p> - * - * @return Remaining Recovery Queue Size - */ - int getRecoveryQueueRemainingSize(); - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic
+ * is offline for some reason. It does so by having a recovery queue which keeps
+ * messages in order in case there is temporary interruption in DMaaP Publisher
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueue {
+
+ /**
+ * <p>
+ * Add batchMessages to Batch Queue
+ * </p>
+ *
+ * @param batchMessages messages that needs to be added to batch queue
+ * @return current size of batch queue. Throws {@link IllegalStateException}
+ * if batch queue does not have enough space
+ */
+ int addBatchMessages(List<String> batchMessages);
+
+
+ /**
+ * <p>
+ * Add recoverable messages to Recoverable Queue
+ * </p>
+ *
+ * @param recoverableMessages messages that needs to be added to recoverable queue
+ * @return current size of the recoverable queue. Throws {@link IllegalStateException}
+ * if recoverable queue does not have enough space
+ */
+ int addRecoverableMessages(List<String> recoverableMessages);
+
+ /**
+ * <p>
+ * Get messages that need to be published to DMaaP topic. Messages in recoverable
+ * queue are appended if present.
+ * </p>
+ *
+ * @return List of messages from both batch and recovery queue
+ */
+ List<String> getMessageForPublishing();
+
+ /**
+ * <p>
+ * Remaining capacity of Batch Queue
+ * </p>
+ *
+ * @return Remaining Batch Queue Size
+ */
+ int getBatchQueueRemainingSize();
+
+ /**
+ * <p>
+ * Remaining capacity of Recovery Queue
+ * </p>
+ *
+ * @return Remaining Recovery Queue Size
+ */
+ int getRecoveryQueueRemainingSize();
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java index 3ebc9dc..8d44f93 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java @@ -1,45 +1,45 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import com.google.inject.assistedinject.Assisted; - -/** - * <p> - * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes. - * <p> - * - * @author Rajiv Singla . Creation Date: 11/1/2016. - */ -public interface DMaaPMRPublisherQueueFactory { - - /** - * Guice Factory to create DMaaP MR Publisher Queue - * - * @param batchQueueSize batch queue size - * @param recoveryQueueSize recovery queue size - * - * @return instance of DMaaP MR Publisher Queue - */ - DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize, - @Assisted("recoveryQueueSize") int recoveryQueueSize); - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueueFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher Queue
+ *
+ * @param batchQueueSize batch queue size
+ * @param recoveryQueueSize recovery queue size
+ *
+ * @return instance of DMaaP MR Publisher Queue
+ */
+ DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize);
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java index 936abe3..e42b6b0 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java @@ -1,126 +1,126 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.publisher; - -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; - -import static com.google.common.collect.Iterables.concat; -import static com.google.common.collect.Lists.newLinkedList; -import static java.util.Collections.unmodifiableList; - -/** - * <p> - * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque} - * for batch and recovery queues - * </p> - * - * - * @author Rajiv Singla . Creation Date: 11/1/2016. - */ -public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class); - - private final LinkedBlockingDeque<String> batchQueue; - private final LinkedBlockingDeque<String> recoveryQueue; - - @Inject - public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize, - @Assisted("recoveryQueueSize") int recoveryQueueSize) { - batchQueue = new LinkedBlockingDeque<>(batchQueueSize); - recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize); - LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}", - batchQueueSize, recoveryQueueSize); - } - - @Override - public synchronized int addBatchMessages(List<String> batchMessages) { - - // checks if batchMessages size does not exceed batch queue capacity - if (batchMessages.size() > batchQueue.remainingCapacity()) { - throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue"); - } - - // Add batchMessages to batch queue - for (String message : batchMessages) { - batchQueue.add(message); - } - - // returns current elements size in batch queue - return batchQueue.size(); - } - - @Override - public synchronized int addRecoverableMessages(List<String> recoverableMessages) { - - // checks if messages size does not exceed recovery queue size - if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) { - throw new IllegalStateException("Not enough capacity to add messages in recovery queue"); - } - - // add messages to recovery queue - for (String recoverableMessage : recoverableMessages) { - recoveryQueue.add(recoverableMessage); - } - - // returns current size of recovery queue - return recoveryQueue.size(); - } - - @Override - public synchronized List<String> getMessageForPublishing() { - - final List<String> recoveryMessageList = new LinkedList<>(); - final List<String> batchMessagesList = new LinkedList<>(); - - // get messages from recovery queue if present - if (!recoveryQueue.isEmpty()) { - final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList); - LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize); - } - - // get messages from batch queue if present - if (!batchQueue.isEmpty()) { - final int batchQueueSize = batchQueue.drainTo(batchMessagesList); - LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize); - } - - // concat recovery and batch queue elements - return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList))); - } - - @Override - public synchronized int getBatchQueueRemainingSize() { - return batchQueue.remainingCapacity(); - } - - @Override - public synchronized int getRecoveryQueueRemainingSize() { - return recoveryQueue.remainingCapacity(); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newLinkedList;
+import static java.util.Collections.unmodifiableList;
+
+/**
+ * <p>
+ * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
+ * for batch and recovery queues
+ * </p>
+ *
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
+
+ private final LinkedBlockingDeque<String> batchQueue;
+ private final LinkedBlockingDeque<String> recoveryQueue;
+
+ @Inject
+ public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize) {
+ batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
+ recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
+ LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
+ batchQueueSize, recoveryQueueSize);
+ }
+
+ @Override
+ public synchronized int addBatchMessages(List<String> batchMessages) {
+
+ // checks if batchMessages size does not exceed batch queue capacity
+ if (batchMessages.size() > batchQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue");
+ }
+
+ // Add batchMessages to batch queue
+ for (String message : batchMessages) {
+ batchQueue.add(message);
+ }
+
+ // returns current elements size in batch queue
+ return batchQueue.size();
+ }
+
+ @Override
+ public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
+
+ // checks if messages size does not exceed recovery queue size
+ if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
+ }
+
+ // add messages to recovery queue
+ for (String recoverableMessage : recoverableMessages) {
+ recoveryQueue.add(recoverableMessage);
+ }
+
+ // returns current size of recovery queue
+ return recoveryQueue.size();
+ }
+
+ @Override
+ public synchronized List<String> getMessageForPublishing() {
+
+ final List<String> recoveryMessageList = new LinkedList<>();
+ final List<String> batchMessagesList = new LinkedList<>();
+
+ // get messages from recovery queue if present
+ if (!recoveryQueue.isEmpty()) {
+ final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
+ LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
+ }
+
+ // get messages from batch queue if present
+ if (!batchQueue.isEmpty()) {
+ final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
+ LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
+ }
+
+ // concat recovery and batch queue elements
+ return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
+ }
+
+ @Override
+ public synchronized int getBatchQueueRemainingSize() {
+ return batchQueue.remainingCapacity();
+ }
+
+ @Override
+ public synchronized int getRecoveryQueueRemainingSize() {
+ return recoveryQueue.remainingCapacity();
+ }
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java index 7e4d040..f193b2c 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriber.java @@ -1,57 +1,57 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber; - -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; - -import java.util.Date; - -/** - * <p> - * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics. - * <p> - * - * @author Rajiv Singla . Creation Date: 10/13/2016. - */ -public interface DMaaPMRSubscriber extends AutoCloseable { - - /** - * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters - * for messageLimit and message timeout are used - * - * @return DMaaP Message Router Subscriber Response - */ - DMaaPMRSubscriberResponse fetchMessages(); - - - /** - * Returns the Subscriber instance creation time - * <p> - * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which - * are published to the topic after the creation of the Subscriber. - * - * @return creation time of Subscriber instance - */ - Date getSubscriberCreationTime(); - - -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
+
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+
+import java.util.Date;
+
+/**
+ * <p>
+ * DMaaP MR Subscriber can be used to subscribe messages from DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRSubscriber extends AutoCloseable {
+
+ /**
+ * Fetches Messages from DMaaP MR Topic. {@link DMaaPMRPublisherConfig} settings parameters
+ * for messageLimit and message timeout are used
+ *
+ * @return DMaaP Message Router Subscriber Response
+ */
+ DMaaPMRSubscriberResponse fetchMessages();
+
+
+ /**
+ * Returns the Subscriber instance creation time
+ * <p>
+ * NOTE: Due to DMaaP API Design - Subscribers can only fetch messages which
+ * are published to the topic after the creation of the Subscriber.
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getSubscriberCreationTime();
+
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java index 4232952..b7f5ada 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberFactory.java @@ -1,47 +1,47 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber; - -import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; - -/** - * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes. - * <p> - * <strong> - * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they - * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize - * guice injected Subscriber instances - * </strong> - * <p> - * @author Rajiv Singla . Creation Date: 10/20/2016. - */ -public interface DMaaPMRSubscriberFactory { - - /** - * Guice Factory to create DMaaP MR Subscriber Instance - * - * @param subscriberConfig subscriber config - * - * @return DMaaP MR Subscriber instance - */ - DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig); -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
+
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+
+/**
+ * Factory to initialize instance of {@link DMaaPMRSubscriber} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRSubscriber} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Subscriber instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRSubscriberFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Subscriber Instance
+ *
+ * @param subscriberConfig subscriber config
+ *
+ * @return DMaaP MR Subscriber instance
+ */
+ DMaaPMRSubscriber create(DMaaPMRSubscriberConfig subscriberConfig);
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java index fd92b9f..678ff76 100644 --- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java +++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/subscriber/DMaaPMRSubscriberImpl.java @@ -1,129 +1,129 @@ -/* - * ===============================LICENSE_START====================================== - * dcae-analytics - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============================LICENSE_END=========================================== - */ - -package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber; - -import com.google.common.base.Optional; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.http.HttpHeaders; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException; -import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig; -import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse; -import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; - -import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode; -import static java.lang.String.format; - -/** - * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient} - * - * @author Rajiv Singla . Creation Date: 10/13/2016. - */ -public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber { - - private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class); - - private final DMaaPMRSubscriberConfig subscriberConfig; - private final CloseableHttpClient closeableHttpClient; - private final URI subscriberUri; - private final Date subscriberCreationTime; - - @Inject - public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig, - CloseableHttpClient closeableHttpClient) { - this.subscriberConfig = subscriberConfig; - this.closeableHttpClient = closeableHttpClient; - this.subscriberUri = createSubscriberURI(subscriberConfig); - this.subscriberCreationTime = new Date(); - } - - @Override - public DMaaPMRSubscriberResponse fetchMessages() { - - final String userName = subscriberConfig.getUserName(); - final String userPassword = subscriberConfig.getUserPassword(); - - final HttpGet getRequest = new HttpGet(subscriberUri); - - // add Authorization Header if username and password are present - final Optional<String> authHeader = getAuthHeader(userName, userPassword); - if (authHeader.isPresent()) { - getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get()); - } else { - LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present."); - } - - try { - - final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler()); - final Integer responseCode = responsePair.getLeft(); - final String responseBody = responsePair.getRight(); - - List<String> fetchedMessages = new LinkedList<>(); - String responseMessage = responseBody; - - // if messages were published successfully, return successful response - if (isSuccessfulResponseCode(responseCode)) { - if (responseBody != null) { - fetchedMessages = convertJsonToStringMessages(responseBody); - responseMessage = "Messages Fetched Successfully"; - } else { - responseMessage = "DMaaP Response Body had no messages"; - } - } else { - LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " + - "DMaaP Response Body: {}", responseCode, responseBody); - } - - return createSubscriberResponse(responseCode, responseMessage, fetchedMessages); - - } catch (IOException e) { - - final String errorMessage = - format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e); - throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e); - } - - - } - - @Override - public Date getSubscriberCreationTime() { - return new Date(subscriberCreationTime.getTime()); - } - - @Override - public void close() throws Exception { - closeableHttpClient.close(); - } -} +/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
+
+ private final DMaaPMRSubscriberConfig subscriberConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final URI subscriberUri;
+ private final Date subscriberCreationTime;
+
+ @Inject
+ public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
+ CloseableHttpClient closeableHttpClient) {
+ this.subscriberConfig = subscriberConfig;
+ this.closeableHttpClient = closeableHttpClient;
+ this.subscriberUri = createSubscriberURI(subscriberConfig);
+ this.subscriberCreationTime = new Date();
+ }
+
+ @Override
+ public DMaaPMRSubscriberResponse fetchMessages() {
+
+ final String userName = subscriberConfig.getUserName();
+ final String userPassword = subscriberConfig.getUserPassword();
+
+ final HttpGet getRequest = new HttpGet(subscriberUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
+ }
+
+ try {
+
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+
+ List<String> fetchedMessages = new LinkedList<>();
+ String responseMessage = responseBody;
+
+ // if messages were published successfully, return successful response
+ if (isSuccessfulResponseCode(responseCode)) {
+ if (responseBody != null) {
+ fetchedMessages = convertJsonToStringMessages(responseBody);
+ responseMessage = "Messages Fetched Successfully";
+ } else {
+ responseMessage = "DMaaP Response Body had no messages";
+ }
+ } else {
+ LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +
+ "DMaaP Response Body: {}", responseCode, responseBody);
+ }
+
+ return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
+
+ } catch (IOException e) {
+
+ final String errorMessage =
+ format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+
+ }
+
+ @Override
+ public Date getSubscriberCreationTime() {
+ return new Date(subscriberCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeableHttpClient.close();
+ }
+}
|