aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher
diff options
context:
space:
mode:
Diffstat (limited to 'dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher')
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java190
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java98
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java418
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java174
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java90
-rw-r--r--dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java252
6 files changed, 611 insertions, 611 deletions
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java
index 1d37786..48b0a70 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisher.java
@@ -1,95 +1,95 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
-
-import java.util.Date;
-import java.util.List;
-
-/**
- * <p>
- * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics.
- * <p>
- *
- * @author Rajiv Singla . Creation Date: 10/13/2016.
- */
-public interface DMaaPMRPublisher extends AutoCloseable {
-
-
- /**
- * <p>
- * Adds collection of messages to DMaaP MR Topic Publishing Queue.
- * <p>
- * Note: Invoking this method may or may not cause publishing immediately
- * as publishing in done is batch mode by default. Parameter maxBatchSize
- * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size.
- * If the maxBatchSize is reached all message will be published automatically
- * during subsequent call.
- * </p>
- *
- * @param messages messages to publish to DMaaP MR Publisher
- * @return response which may contain Http Response code 202 (Accepted) as publishing
- * will proceed when max batch size is reached. Throws {@link DCAEAnalyticsRuntimeException}
- * if publishing fails
- */
- DMaaPMRPublisherResponse publish(List<String> messages);
-
-
- /**
- * <p>
- * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse}
- * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic.
- * </p>
- *
- * @param messages messages to publish to DMaaP MR Publisher
- * @return DMaaP Message Router Publisher Response. Throws {@link DCAEAnalyticsRuntimeException}
- * if force publishing fails
- *
- */
- DMaaPMRPublisherResponse forcePublish(List<String> messages);
-
-
- /**
- * <p>
- * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns
- * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to
- * be flushed response code 304 (Not Modified) will be returned
- * </p>
- *
- * @return DMaaP Message Router Publisher Response
- */
- DMaaPMRPublisherResponse flush();
-
-
- /**
- * <p>
- * Returns the creation time when Publisher instance was created.
- * <p>
- *
- * @return creation time of Subscriber instance
- */
- Date getPublisherCreationTime();
-
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher can be used to publish messages to DMaaP MR Topics.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public interface DMaaPMRPublisher extends AutoCloseable {
+
+
+ /**
+ * <p>
+ * Adds collection of messages to DMaaP MR Topic Publishing Queue.
+ * <p>
+ * Note: Invoking this method may or may not cause publishing immediately
+ * as publishing in done is batch mode by default. Parameter maxBatchSize
+ * in {@link DMaaPMRPublisherConfig} is used to determine max batch queue size.
+ * If the maxBatchSize is reached all message will be published automatically
+ * during subsequent call.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return response which may contain Http Response code 202 (Accepted) as publishing
+ * will proceed when max batch size is reached. Throws {@link DCAEAnalyticsRuntimeException}
+ * if publishing fails
+ */
+ DMaaPMRPublisherResponse publish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages to DMaaP MR Topic and returns {@link DMaaPMRPublisherResponse}
+ * which can be inspected for HTTP status code of publishing call to DMaaP MR Topic.
+ * </p>
+ *
+ * @param messages messages to publish to DMaaP MR Publisher
+ * @return DMaaP Message Router Publisher Response. Throws {@link DCAEAnalyticsRuntimeException}
+ * if force publishing fails
+ *
+ */
+ DMaaPMRPublisherResponse forcePublish(List<String> messages);
+
+
+ /**
+ * <p>
+ * Forces publishing of messages in Publisher queue to DMaaP MR Topic and returns
+ * {@link DMaaPMRPublisherResponse}.If there are no messages were in the queue to
+ * be flushed response code 304 (Not Modified) will be returned
+ * </p>
+ *
+ * @return DMaaP Message Router Publisher Response
+ */
+ DMaaPMRPublisherResponse flush();
+
+
+ /**
+ * <p>
+ * Returns the creation time when Publisher instance was created.
+ * <p>
+ *
+ * @return creation time of Subscriber instance
+ */
+ Date getPublisherCreationTime();
+
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java
index 0e71559..5d6dfa1 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherFactory.java
@@ -1,49 +1,49 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-
-/**
- * <p>
- * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes.
- * <p>
- * <strong>
- * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they
- * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
- * guice injected Publisher instances
- * </strong>
- * <p>
- * @author Rajiv Singla . Creation Date: 10/20/2016.
- */
-public interface DMaaPMRPublisherFactory {
-
- /**
- * Guice Factory to create DMaaP MR Publisher
- *
- * @param publisherConfig publisher config
- *
- * @return DMaaP MR Publisher instance
- */
- DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig);
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisher} for Guice DI injection purposes.
+ * <p>
+ * <strong>
+ * NOTE: Client should not use this Factory to initialize {@link DMaaPMRPublisher} unless they
+ * are wiring dependencies using Guice. Client must use {@link DMaaPMRFactory} to initialize
+ * guice injected Publisher instances
+ * </strong>
+ * <p>
+ * @author Rajiv Singla . Creation Date: 10/20/2016.
+ */
+public interface DMaaPMRPublisherFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher
+ *
+ * @param publisherConfig publisher config
+ *
+ * @return DMaaP MR Publisher instance
+ */
+ DMaaPMRPublisher create(DMaaPMRPublisherConfig publisherConfig);
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java
index 1901189..b3e303e 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherImpl.java
@@ -1,209 +1,209 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.http.HttpHeaders;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
-import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Date;
-import java.util.List;
-
-import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
-import static java.lang.String.format;
-
-/**
- * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
- *
- * @author Rajiv Singla . Creation Date: 10/13/2016.
- */
-public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
-
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
-
- private final DMaaPMRPublisherConfig publisherConfig;
- private final CloseableHttpClient closeableHttpClient;
- private final DMaaPMRPublisherQueue publisherQueue;
- private final Date publisherCreationTime;
- private URI publisherUri;
-
- @Inject
- public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
- DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
- CloseableHttpClient closeableHttpClient) {
-
- this.publisherConfig = publisherConfig;
- this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
- publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());
- this.closeableHttpClient = closeableHttpClient;
- this.publisherUri = createPublisherURI(publisherConfig);
- this.publisherCreationTime = new Date();
- }
-
-
- @Override
- public DMaaPMRPublisherResponse publish(List<String> messages) {
-
- final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
-
- // if messages size is less than batch queue size - just queue them for batch publishing
- if (batchQueueRemainingSize > messages.size()) {
- LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
- messages.size(), batchQueueRemainingSize);
- final int batchQueueSize = publisherQueue.addBatchMessages(messages);
- return createPublisherAcceptedResponse(batchQueueSize);
-
- } else {
-
- // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
- final List<String> queueMessages = publisherQueue.getMessageForPublishing();
- LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
- "Publisher Topic.");
- return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
- }
-
- }
-
- @Override
- public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
-
- LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
-
- final String contentType = publisherConfig.getContentType();
- final String userName = publisherConfig.getUserName();
- final String userPassword = publisherConfig.getUserPassword();
- final HttpPost postRequest = new HttpPost(publisherUri);
-
- // add Authorization Header if username and password are present
- final Optional<String> authHeader = getAuthHeader(userName, userPassword);
- if (authHeader.isPresent()) {
- postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
- } else {
- LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
- }
-
- // Create post string entity
- final String messagesJson = convertToJsonString(messages);
- final StringEntity requestEntity =
- new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
- postRequest.setEntity(requestEntity);
-
- try {
- final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
- final Integer responseCode = responsePair.getLeft();
- final String responseBody = responsePair.getRight();
- // if messages were published successfully, return successful response
- if (isSuccessfulResponseCode(responseCode)) {
- LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
- "Body: {}, Number of Messages published: {}",
- responseCode, responseBody, messages.size());
-
- } else {
- LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
- "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
- addMessagesToRecoveryQueue(publisherQueue, messages);
- }
-
- return createPublisherResponse(responseCode, responseBody,
- getPendingMessages(publisherQueue, publisherConfig));
-
- } catch (IOException e) {
- // If IO Error then we need to also put messages in recovery queue
- addMessagesToRecoveryQueue(publisherQueue, messages);
- final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
- "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
-
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
- }
-
- }
-
-
- @Override
- public DMaaPMRPublisherResponse flush() {
- final List<String> queueMessages = publisherQueue.getMessageForPublishing();
- // If there are no message return 204 (No Content) response code
- if (queueMessages.isEmpty()) {
- LOG.debug("No messages to publish to batch queue. Returning 204 status code");
- return createPublisherNoContentResponse();
- } else {
- // force publish messages in queue
- return forcePublish(queueMessages);
- }
- }
-
- @Override
- public Date getPublisherCreationTime() {
- return new Date(publisherCreationTime.getTime());
- }
-
- @Override
- public void close() throws Exception {
-
- // flush current message in the queue
- int retrialNumber = 0;
- int flushResponseCode;
-
- // automatic retries if messages cannot be flushed
- do {
- retrialNumber++;
- DMaaPMRPublisherResponse flushResponse = flush();
- flushResponseCode = flushResponse.getResponseCode();
-
- if (!isSuccessfulResponseCode(flushResponseCode)) {
- LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
- "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
- AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
-
- Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
- }
- } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
- !isSuccessfulResponseCode(flushResponseCode));
-
- if (!isSuccessfulResponseCode(flushResponseCode)) {
- LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
- } else {
- LOG.info("Successfully published all batched messages to publisher.");
- }
-
- // close http client
- closeableHttpClient.close();
-
- }
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
+import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
+import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
+import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.List;
+
+import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRPublisher} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRPublisherImpl extends BaseDMaaPMRComponent implements DMaaPMRPublisher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherImpl.class);
+
+ private final DMaaPMRPublisherConfig publisherConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final DMaaPMRPublisherQueue publisherQueue;
+ private final Date publisherCreationTime;
+ private URI publisherUri;
+
+ @Inject
+ public DMaaPMRPublisherImpl(@Assisted DMaaPMRPublisherConfig publisherConfig,
+ DMaaPMRPublisherQueueFactory dMaaPMRPublisherQueueFactory,
+ CloseableHttpClient closeableHttpClient) {
+
+ this.publisherConfig = publisherConfig;
+ this.publisherQueue = dMaaPMRPublisherQueueFactory.create(
+ publisherConfig.getMaxBatchSize(), publisherConfig.getMaxRecoveryQueueSize());
+ this.closeableHttpClient = closeableHttpClient;
+ this.publisherUri = createPublisherURI(publisherConfig);
+ this.publisherCreationTime = new Date();
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse publish(List<String> messages) {
+
+ final int batchQueueRemainingSize = publisherQueue.getBatchQueueRemainingSize();
+
+ // if messages size is less than batch queue size - just queue them for batch publishing
+ if (batchQueueRemainingSize > messages.size()) {
+ LOG.debug("Adding messages to batch Queue. No flushing required. Messages Size:{}. Batch Queue Size:{}",
+ messages.size(), batchQueueRemainingSize);
+ final int batchQueueSize = publisherQueue.addBatchMessages(messages);
+ return createPublisherAcceptedResponse(batchQueueSize);
+
+ } else {
+
+ // grab all already queued messages, append current messages and force publish them to DMaaP MR topic
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ LOG.debug("Batch Queue capacity exceeds messages size. Flushing of all pending messages to DMaaP MR " +
+ "Publisher Topic.");
+ return forcePublish(Lists.newLinkedList(Iterables.concat(queueMessages, messages)));
+ }
+
+ }
+
+ @Override
+ public DMaaPMRPublisherResponse forcePublish(List<String> messages) {
+
+ LOG.debug("Force publishing messages to DMaaP MR Topic. Messages Size: {}", messages.size());
+
+ final String contentType = publisherConfig.getContentType();
+ final String userName = publisherConfig.getUserName();
+ final String userPassword = publisherConfig.getUserPassword();
+ final HttpPost postRequest = new HttpPost(publisherUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ postRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Publisher Authentication is disabled as username or password is not present.");
+ }
+
+ // Create post string entity
+ final String messagesJson = convertToJsonString(messages);
+ final StringEntity requestEntity =
+ new StringEntity(messagesJson, ContentType.create(contentType, "UTF-8"));
+ postRequest.setEntity(requestEntity);
+
+ try {
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(postRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+ // if messages were published successfully, return successful response
+ if (isSuccessfulResponseCode(responseCode)) {
+ LOG.debug("DMaaP MR Messages published successfully. DMaaP Response Code: {}. DMaaP Response " +
+ "Body: {}, Number of Messages published: {}",
+ responseCode, responseBody, messages.size());
+
+ } else {
+ LOG.warn("Unable to publish messages to DMaaP MR Topic. DMaaP Response Code: {}, DMaaP Response " +
+ "Body: {}. Messages will be queued in recovery queue", responseCode, responseBody);
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ }
+
+ return createPublisherResponse(responseCode, responseBody,
+ getPendingMessages(publisherQueue, publisherConfig));
+
+ } catch (IOException e) {
+ // If IO Error then we need to also put messages in recovery queue
+ addMessagesToRecoveryQueue(publisherQueue, messages);
+ final String errorMessage = format("IO Exception while publishing messages to DMaaP Topic. " +
+ "Messages will be queued in recovery queue. Messages Size: %d", messages.size());
+
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+ }
+
+
+ @Override
+ public DMaaPMRPublisherResponse flush() {
+ final List<String> queueMessages = publisherQueue.getMessageForPublishing();
+ // If there are no message return 204 (No Content) response code
+ if (queueMessages.isEmpty()) {
+ LOG.debug("No messages to publish to batch queue. Returning 204 status code");
+ return createPublisherNoContentResponse();
+ } else {
+ // force publish messages in queue
+ return forcePublish(queueMessages);
+ }
+ }
+
+ @Override
+ public Date getPublisherCreationTime() {
+ return new Date(publisherCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ // flush current message in the queue
+ int retrialNumber = 0;
+ int flushResponseCode;
+
+ // automatic retries if messages cannot be flushed
+ do {
+ retrialNumber++;
+ DMaaPMRPublisherResponse flushResponse = flush();
+ flushResponseCode = flushResponse.getResponseCode();
+
+ if (!isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.warn("Unable to flush batch messages to publisher due to DMaaP MR invalid Response: {}. " +
+ "Retrial No: {} of Max {} Retries", flushResponseCode, retrialNumber,
+ AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE);
+
+ Thread.sleep(AnalyticsConstants.PUBLISHER_DELAY_MS_ON_RETRIES_ON_CLOSE);
+ }
+ } while (retrialNumber <= AnalyticsConstants.PUBLISHER_MAX_FLUSH_RETRIES_ON_CLOSE &&
+ !isSuccessfulResponseCode(flushResponseCode));
+
+ if (!isSuccessfulResponseCode(flushResponseCode)) {
+ LOG.error("Unable to flush batch messages to publisher. Messages loss cannot be prevented");
+ } else {
+ LOG.info("Successfully published all batched messages to publisher.");
+ }
+
+ // close http client
+ closeableHttpClient.close();
+
+ }
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java
index 281af53..3877f0b 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueue.java
@@ -1,87 +1,87 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import java.util.List;
-
-/**
- * <p>
- * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic
- * is offline for some reason. It does so by having a recovery queue which keeps
- * messages in order in case there is temporary interruption in DMaaP Publisher
- * </p>
- *
- * @author Rajiv Singla . Creation Date: 11/1/2016.
- */
-public interface DMaaPMRPublisherQueue {
-
- /**
- * <p>
- * Add batchMessages to Batch Queue
- * </p>
- *
- * @param batchMessages messages that needs to be added to batch queue
- * @return current size of batch queue. Throws {@link IllegalStateException}
- * if batch queue does not have enough space
- */
- int addBatchMessages(List<String> batchMessages);
-
-
- /**
- * <p>
- * Add recoverable messages to Recoverable Queue
- * </p>
- *
- * @param recoverableMessages messages that needs to be added to recoverable queue
- * @return current size of the recoverable queue. Throws {@link IllegalStateException}
- * if recoverable queue does not have enough space
- */
- int addRecoverableMessages(List<String> recoverableMessages);
-
- /**
- * <p>
- * Get messages that need to be published to DMaaP topic. Messages in recoverable
- * queue are appended if present.
- * </p>
- *
- * @return List of messages from both batch and recovery queue
- */
- List<String> getMessageForPublishing();
-
- /**
- * <p>
- * Remaining capacity of Batch Queue
- * </p>
- *
- * @return Remaining Batch Queue Size
- */
- int getBatchQueueRemainingSize();
-
- /**
- * <p>
- * Remaining capacity of Recovery Queue
- * </p>
- *
- * @return Remaining Recovery Queue Size
- */
- int getRecoveryQueueRemainingSize();
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import java.util.List;
+
+/**
+ * <p>
+ * DMaaP MR Publisher Queue handles back pressure in case DMaaP MR Publisher topic
+ * is offline for some reason. It does so by having a recovery queue which keeps
+ * messages in order in case there is temporary interruption in DMaaP Publisher
+ * </p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueue {
+
+ /**
+ * <p>
+ * Add batchMessages to Batch Queue
+ * </p>
+ *
+ * @param batchMessages messages that needs to be added to batch queue
+ * @return current size of batch queue. Throws {@link IllegalStateException}
+ * if batch queue does not have enough space
+ */
+ int addBatchMessages(List<String> batchMessages);
+
+
+ /**
+ * <p>
+ * Add recoverable messages to Recoverable Queue
+ * </p>
+ *
+ * @param recoverableMessages messages that needs to be added to recoverable queue
+ * @return current size of the recoverable queue. Throws {@link IllegalStateException}
+ * if recoverable queue does not have enough space
+ */
+ int addRecoverableMessages(List<String> recoverableMessages);
+
+ /**
+ * <p>
+ * Get messages that need to be published to DMaaP topic. Messages in recoverable
+ * queue are appended if present.
+ * </p>
+ *
+ * @return List of messages from both batch and recovery queue
+ */
+ List<String> getMessageForPublishing();
+
+ /**
+ * <p>
+ * Remaining capacity of Batch Queue
+ * </p>
+ *
+ * @return Remaining Batch Queue Size
+ */
+ int getBatchQueueRemainingSize();
+
+ /**
+ * <p>
+ * Remaining capacity of Recovery Queue
+ * </p>
+ *
+ * @return Remaining Recovery Queue Size
+ */
+ int getRecoveryQueueRemainingSize();
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java
index 3ebc9dc..8d44f93 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueFactory.java
@@ -1,45 +1,45 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import com.google.inject.assistedinject.Assisted;
-
-/**
- * <p>
- * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes.
- * <p>
- *
- * @author Rajiv Singla . Creation Date: 11/1/2016.
- */
-public interface DMaaPMRPublisherQueueFactory {
-
- /**
- * Guice Factory to create DMaaP MR Publisher Queue
- *
- * @param batchQueueSize batch queue size
- * @param recoveryQueueSize recovery queue size
- *
- * @return instance of DMaaP MR Publisher Queue
- */
- DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize,
- @Assisted("recoveryQueueSize") int recoveryQueueSize);
-
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.inject.assistedinject.Assisted;
+
+/**
+ * <p>
+ * Factory to initialize instance of {@link DMaaPMRPublisherQueue} for Guice DI injection purposes.
+ * <p>
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public interface DMaaPMRPublisherQueueFactory {
+
+ /**
+ * Guice Factory to create DMaaP MR Publisher Queue
+ *
+ * @param batchQueueSize batch queue size
+ * @param recoveryQueueSize recovery queue size
+ *
+ * @return instance of DMaaP MR Publisher Queue
+ */
+ DMaaPMRPublisherQueue create(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize);
+
+}
diff --git a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java
index 936abe3..e42b6b0 100644
--- a/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java
+++ b/dcae-analytics-dmaap/src/main/java/org/openecomp/dcae/apod/analytics/dmaap/service/publisher/DMaaPMRPublisherQueueImpl.java
@@ -1,126 +1,126 @@
-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static com.google.common.collect.Iterables.concat;
-import static com.google.common.collect.Lists.newLinkedList;
-import static java.util.Collections.unmodifiableList;
-
-/**
- * <p>
- * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
- * for batch and recovery queues
- * </p>
- *
- *
- * @author Rajiv Singla . Creation Date: 11/1/2016.
- */
-public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
-
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
-
- private final LinkedBlockingDeque<String> batchQueue;
- private final LinkedBlockingDeque<String> recoveryQueue;
-
- @Inject
- public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
- @Assisted("recoveryQueueSize") int recoveryQueueSize) {
- batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
- recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
- LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
- batchQueueSize, recoveryQueueSize);
- }
-
- @Override
- public synchronized int addBatchMessages(List<String> batchMessages) {
-
- // checks if batchMessages size does not exceed batch queue capacity
- if (batchMessages.size() > batchQueue.remainingCapacity()) {
- throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue");
- }
-
- // Add batchMessages to batch queue
- for (String message : batchMessages) {
- batchQueue.add(message);
- }
-
- // returns current elements size in batch queue
- return batchQueue.size();
- }
-
- @Override
- public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
-
- // checks if messages size does not exceed recovery queue size
- if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
- throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
- }
-
- // add messages to recovery queue
- for (String recoverableMessage : recoverableMessages) {
- recoveryQueue.add(recoverableMessage);
- }
-
- // returns current size of recovery queue
- return recoveryQueue.size();
- }
-
- @Override
- public synchronized List<String> getMessageForPublishing() {
-
- final List<String> recoveryMessageList = new LinkedList<>();
- final List<String> batchMessagesList = new LinkedList<>();
-
- // get messages from recovery queue if present
- if (!recoveryQueue.isEmpty()) {
- final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
- LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
- }
-
- // get messages from batch queue if present
- if (!batchQueue.isEmpty()) {
- final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
- LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
- }
-
- // concat recovery and batch queue elements
- return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
- }
-
- @Override
- public synchronized int getBatchQueueRemainingSize() {
- return batchQueue.remainingCapacity();
- }
-
- @Override
- public synchronized int getRecoveryQueueRemainingSize() {
- return recoveryQueue.remainingCapacity();
- }
-}
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Lists.newLinkedList;
+import static java.util.Collections.unmodifiableList;
+
+/**
+ * <p>
+ * An implementation of {@link DMaaPMRPublisherQueue} which uses {@link java.util.concurrent.BlockingDeque}
+ * for batch and recovery queues
+ * </p>
+ *
+ *
+ * @author Rajiv Singla . Creation Date: 11/1/2016.
+ */
+public class DMaaPMRPublisherQueueImpl implements DMaaPMRPublisherQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRPublisherQueueImpl.class);
+
+ private final LinkedBlockingDeque<String> batchQueue;
+ private final LinkedBlockingDeque<String> recoveryQueue;
+
+ @Inject
+ public DMaaPMRPublisherQueueImpl(@Assisted("batchQueueSize") int batchQueueSize,
+ @Assisted("recoveryQueueSize") int recoveryQueueSize) {
+ batchQueue = new LinkedBlockingDeque<>(batchQueueSize);
+ recoveryQueue = new LinkedBlockingDeque<>(recoveryQueueSize);
+ LOG.debug("Creating Instance of DMaaP Publisher Queue. BatchQueueSize: {}, RecoveryQueueSize: {}",
+ batchQueueSize, recoveryQueueSize);
+ }
+
+ @Override
+ public synchronized int addBatchMessages(List<String> batchMessages) {
+
+ // checks if batchMessages size does not exceed batch queue capacity
+ if (batchMessages.size() > batchQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add batchMessages in batch queue");
+ }
+
+ // Add batchMessages to batch queue
+ for (String message : batchMessages) {
+ batchQueue.add(message);
+ }
+
+ // returns current elements size in batch queue
+ return batchQueue.size();
+ }
+
+ @Override
+ public synchronized int addRecoverableMessages(List<String> recoverableMessages) {
+
+ // checks if messages size does not exceed recovery queue size
+ if (recoverableMessages.size() > recoveryQueue.remainingCapacity()) {
+ throw new IllegalStateException("Not enough capacity to add messages in recovery queue");
+ }
+
+ // add messages to recovery queue
+ for (String recoverableMessage : recoverableMessages) {
+ recoveryQueue.add(recoverableMessage);
+ }
+
+ // returns current size of recovery queue
+ return recoveryQueue.size();
+ }
+
+ @Override
+ public synchronized List<String> getMessageForPublishing() {
+
+ final List<String> recoveryMessageList = new LinkedList<>();
+ final List<String> batchMessagesList = new LinkedList<>();
+
+ // get messages from recovery queue if present
+ if (!recoveryQueue.isEmpty()) {
+ final int recoveryQueueSize = recoveryQueue.drainTo(recoveryMessageList);
+ LOG.debug("Drained Recovery Queue elements for flushing: {}", recoveryQueueSize);
+ }
+
+ // get messages from batch queue if present
+ if (!batchQueue.isEmpty()) {
+ final int batchQueueSize = batchQueue.drainTo(batchMessagesList);
+ LOG.debug("Drained Batch Queue elements for flushing: {}", batchQueueSize);
+ }
+
+ // concat recovery and batch queue elements
+ return unmodifiableList(newLinkedList(concat(recoveryMessageList, batchMessagesList)));
+ }
+
+ @Override
+ public synchronized int getBatchQueueRemainingSize() {
+ return batchQueue.remainingCapacity();
+ }
+
+ @Override
+ public synchronized int getRecoveryQueueRemainingSize() {
+ return recoveryQueue.remainingCapacity();
+ }
+}