summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java')
-rw-r--r--src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java187
1 files changed, 96 insertions, 91 deletions
diff --git a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
index 0cebbf9..c52ff10 100644
--- a/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
+++ b/src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
@@ -1,22 +1,30 @@
-/*
- * ============LICENSE_START===================================================
- * Copyright (c) 2018 Amdocs
- * ============================================================================
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- * ============LICENSE_END=====================================================
+ * ============LICENSE_END=========================================================
*/
+
package org.onap.aai.validation.publisher;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.inject.Inject;
import org.onap.aai.event.client.DMaaPEventPublisher;
import org.onap.aai.validation.config.TopicAdminConfig;
import org.onap.aai.validation.config.TopicConfig;
@@ -26,10 +34,6 @@ import org.onap.aai.validation.exception.ValidationServiceException;
import org.onap.aai.validation.factory.DMaaPEventPublisherFactory;
import org.onap.aai.validation.logging.ApplicationMsgs;
import org.onap.aai.validation.logging.LogHelper;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import javax.inject.Inject;
/**
* Event Publisher
@@ -39,126 +43,127 @@ public class ValidationEventPublisher implements MessagePublisher {
private static LogHelper applicationLogger = LogHelper.INSTANCE;
- private List<Topic> publisherTopics;
+ private List<Topic> publisherTopics;
- private boolean enablePublishing;
+ private boolean enablePublishing;
- private long retries;
+ private long retries;
- private long retriesRemaining;
+ private long retriesRemaining;
private DMaaPEventPublisherFactory dMaapFactory;
- /**
- * Instantiates an Event Publisher instance using properties from config file.
- *
- * @param topicConfig
- * @param topicAdminConfig
- */
- @Inject
+ /**
+ * Instantiates an Event Publisher instance using properties from config file.
+ *
+ * @param topicConfig
+ * @param topicAdminConfig
+ */
+ @Inject
public ValidationEventPublisher(TopicConfig topicConfig, TopicAdminConfig topicAdminConfig) {
- enablePublishing = topicAdminConfig.isPublishEnable();
- if (enablePublishing) {
- publisherTopics = topicConfig.getPublisherTopics();
- retries = topicAdminConfig.getPublishRetries();
- }
+ enablePublishing = topicAdminConfig.isPublishEnable();
+ if (enablePublishing) {
+ publisherTopics = topicConfig.getPublisherTopics();
+ retries = topicAdminConfig.getPublishRetries();
+ }
dMaapFactory = new DMaaPEventPublisherFactory();
- }
-
- /**
- * Connect to the event publisher, add the message, and then publish it by closing the publisher.
- */
- @Override
- public void publishMessage(String message) throws ValidationServiceException {
- Collection<String> messages = new ArrayList<>();
- messages.add(message);
- publishMessages(messages);
- }
-
- /**
- * Connect to the event publisher, adds the messages, and then publish them by closing the publisher.
- */
- @Override
- public void publishMessages(Collection<String> messages) throws ValidationServiceException {
- if (!enablePublishing) {
- return;
- } else {
+ }
+
+ /**
+ * Connect to the event publisher, add the message, and then publish it by closing the publisher.
+ */
+ @Override
+ public void publishMessage(String message) throws ValidationServiceException {
+ Collection<String> messages = new ArrayList<>();
+ messages.add(message);
+ publishMessages(messages);
+ }
+
+ /**
+ * Connect to the event publisher, adds the messages, and then publish them by closing the publisher.
+ */
+ @Override
+ public void publishMessages(Collection<String> messages) throws ValidationServiceException {
+ if (!enablePublishing) {
+ return;
+ } else {
applicationLogger.debug("Publishing messages: " + messages);
- for (Topic topic : publisherTopics) {
- retriesRemaining = retries;
- publishMessages(messages, topic);
- }
- }
- }
+ for (Topic topic : publisherTopics) {
+ retriesRemaining = retries;
+ publishMessages(messages, topic);
+ }
+ }
+ }
- private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException {
+ private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException {
- DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(), topic.getUsername(),
- topic.getPassword(), topic.getTransportType());
+ DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(),
+ topic.getUsername(), topic.getPassword(), topic.getTransportType());
- try {
- // Add our message to the publisher's queue/bus
+ try {
+ // Add our message to the publisher's queue/bus
int result = dMaapEventPublisher.sendSync(topic.getPartition(), messages);
- if (result != messages.size()) {
- applicationLogger.warn(ApplicationMsgs.UNSENT_MESSAGE_WARN);
+ if (result != messages.size()) {
+ applicationLogger.warn(ApplicationMsgs.UNSENT_MESSAGE_WARN);
closeEventPublisher(dMaapEventPublisher);
retryOrThrow(messages, topic, new ValidationServiceException(
ValidationServiceError.EVENT_CLIENT_INCORRECT_NUMBER_OF_MESSAGES_SENT, result));
- }
+ }
} catch (Exception e) {
- applicationLogger.error(ApplicationMsgs.UNSENT_MESSAGE_ERROR);
+ applicationLogger.error(ApplicationMsgs.UNSENT_MESSAGE_ERROR);
closeEventPublisher(dMaapEventPublisher);
retryOrThrow(messages, topic,
new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_SEND_ERROR, e));
- }
+ }
completeMessageSending(dMaapEventPublisher, topic);
- }
-
- /**
- * Publish the queued messages by closing the publisher.
- *
- * @param eventPublisher the publisher to close
- * @throws AuditException
- */
+ }
+
+ /**
+ * Publish the queued messages by closing the publisher.
+ *
+ * @param eventPublisher
+ * the publisher to close
+ * @throws AuditException
+ */
private void completeMessageSending(DMaaPEventPublisher eventPublisher, Topic topic)
throws ValidationServiceException {
List<String> unsentMsgs = closeEventPublisher(eventPublisher);
- if (unsentMsgs != null && !unsentMsgs.isEmpty()) {
- // Log the error, as the exception will not be propagated due to the fact that the Cambria Client throws
- // an exception first in a separate thread.
- applicationLogger.error(ApplicationMsgs.EVENT_CLIENT_CLOSE_UNSENT_MESSAGE,
+ if (unsentMsgs != null && !unsentMsgs.isEmpty()) {
+ // Log the error, as the exception will not be propagated due to the fact that the Cambria Client throws
+ // an exception first in a separate thread.
+ applicationLogger.error(ApplicationMsgs.EVENT_CLIENT_CLOSE_UNSENT_MESSAGE,
ValidationServiceError.EVENT_CLIENT_CLOSE_UNSENT_MESSAGE.getMessage(unsentMsgs));
retryOrThrow(unsentMsgs, topic, new ValidationServiceException(
ValidationServiceError.EVENT_CLIENT_CLOSE_UNSENT_MESSAGE, unsentMsgs));
- }
- }
+ }
+ }
private void retryOrThrow(Collection<String> messages, Topic topic, ValidationServiceException exceptionToThrow)
throws ValidationServiceException {
- if (retriesRemaining <= 0) {
- applicationLogger.warn(ApplicationMsgs.SEND_MESSAGE_ABORT_WARN);
- throw exceptionToThrow;
- } else {
- applicationLogger.warn(ApplicationMsgs.SEND_MESSAGE_RETRY_WARN);
- retriesRemaining--;
- publishMessages(messages, topic);
- }
- }
+ if (retriesRemaining <= 0) {
+ applicationLogger.warn(ApplicationMsgs.SEND_MESSAGE_ABORT_WARN);
+ throw exceptionToThrow;
+ } else {
+ applicationLogger.warn(ApplicationMsgs.SEND_MESSAGE_RETRY_WARN);
+ retriesRemaining--;
+ publishMessages(messages, topic);
+ }
+ }
private List<String> closeEventPublisher(DMaaPEventPublisher eventPublisher) throws ValidationServiceException {
- try {
+ try {
return eventPublisher.closeWithUnsent();
- } catch (Exception e) {
- throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CLOSE_ERROR, e);
- }
- }
+ } catch (Exception e) {
+ throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CLOSE_ERROR, e);
+ }
+ }
public void setEventPublisherFactory(DMaaPEventPublisherFactory dMaapFactory) {
this.dMaapFactory = dMaapFactory;
}
-} \ No newline at end of file
+}