aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
diff options
context:
space:
mode:
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java23
1 files changed, 17 insertions, 6 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
index 0098eac7d9..b93d485bdb 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
@@ -19,6 +19,7 @@
*/
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.dao.api.ActionStatus;
@@ -36,16 +37,26 @@ public class DistributionNotificationSender {
private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName());
@javax.annotation.Resource
protected ComponentsUtils componentUtils;
- private CambriaHandler cambriaHandler = new CambriaHandler();
- private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ private final CambriaHandler cambriaHandler = new CambriaHandler();
+
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
+
+//
+ private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData,
INotificationData notificationData, Service service, User modifier) {
long startTime = System.currentTimeMillis();
- CambriaErrorResponse status = cambriaHandler
- .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
- messageBusData.getDmaaPuebEndpoints(), notificationData,
- deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ CambriaErrorResponse status;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ status = cambriaHandler
+ .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
+ messageBusData.getDmaaPuebEndpoints(), notificationData,
+ deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ }
+ else{
+ status = kafkaHandler.sendNotification(topicName, notificationData);
+ }
logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
auditDistributionNotification(
new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service)