diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java index 0098eac7d9..b93d485bdb 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java @@ -19,6 +19,7 @@ */ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.dao.api.ActionStatus; @@ -36,16 +37,26 @@ public class DistributionNotificationSender { private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName()); @javax.annotation.Resource protected ComponentsUtils componentUtils; - private CambriaHandler cambriaHandler = new CambriaHandler(); - private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + private final CambriaHandler cambriaHandler = new CambriaHandler(); + + private final KafkaHandler kafkaHandler = new KafkaHandler(); + +// + private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData, INotificationData notificationData, Service service, User modifier) { long startTime = System.currentTimeMillis(); - CambriaErrorResponse status = cambriaHandler - .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), - messageBusData.getDmaaPuebEndpoints(), notificationData, - deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + CambriaErrorResponse status; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + status = cambriaHandler + .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), + messageBusData.getDmaaPuebEndpoints(), notificationData, + deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + } + else{ + status = kafkaHandler.sendNotification(topicName, notificationData); + } logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode()); auditDistributionNotification( new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service) |