diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-06-16 09:38:26 +0100 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-10-10 17:40:51 +0100 |
commit | cff56489f774f937654cb6eb198d3d5ef41418a2 (patch) | |
tree | 3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java | |
parent | 1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff) |
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details
Add kafka config to IConfiguration interface
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f
Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java | 43 |
1 files changed, 17 insertions, 26 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java index 1fb71a6..44a9ddb 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java @@ -20,55 +20,46 @@ package org.onap.sdc.utils; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.results.IDistributionClientResult; import org.onap.sdc.impl.DistributionClientResultImpl; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class NotificationSender { private static final Logger log = LoggerFactory.getLogger(NotificationSender.class); - private static final long PUBLISHER_CLOSING_TIMEOUT = 10L; private static final long SLEEP_TIME = 1; + private final SdcKafkaProducer producer; - private final List<String> brokerServers; - - public NotificationSender(List<String> brokerServers) { - this.brokerServers = brokerServers; + public NotificationSender(SdcKafkaProducer producer) { + this.producer = producer; } - public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) { + public IDistributionClientResult send(String topic, String status) { log.info("DistributionClient - sendStatus"); DistributionClientResultImpl distributionResult; try { - log.debug("Publisher server list: {}", brokerServers); - log.debug("Trying to send status: {}", status); - publisher.send("MyPartitionKey", status); + log.debug("Publisher server list: {}", producer.getMsgBusAddresses()); + log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName()); + producer.send(topic, "MyPartitionKey", status); TimeUnit.SECONDS.sleep(SLEEP_TIME); - } catch (IOException | InterruptedException e) { - log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e); + } catch (KafkaException | InterruptedException e) { + log.error("DistributionClient - sendStatus. Failed to send status", e); } finally { - distributionResult = closePublisher(publisher); + distributionResult = closeProducer(); } return distributionResult; } - private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) { + private DistributionClientResultImpl closeProducer() { DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); try { - List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS); - if (notSentMessages.isEmpty()) { - distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - } else { - log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size()); - } - } catch (IOException | InterruptedException e) { + producer.flush(); + distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + } catch (KafkaException | IllegalArgumentException e) { log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e); } return distributionResult; |