summaryrefslogtreecommitdiffstats
path: root/cps-ncmp-service/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java37
-rw-r--r--cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/dmiavailability/DMiPluginWatchDog.java2
2 files changed, 18 insertions, 21 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
index 49e455e580..355e5cdf79 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
@@ -64,8 +64,8 @@ public class EventsPublisher<T> {
cloudEventKafkaTemplate.send(topicName, eventKey, event);
eventFuture.whenComplete((result, e) -> {
if (e == null) {
- log.debug("Successfully published event to topic : {} , Event : {}",
- result.getRecordMetadata().topic(), result.getProducerRecord().value());
+ log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+ result.getProducerRecord().value());
} else {
log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
@@ -85,14 +85,7 @@ public class EventsPublisher<T> {
public void publishEvent(final String topicName, final String eventKey, final T event) {
final CompletableFuture<SendResult<String, T>> eventFuture =
legacyKafkaEventTemplate.send(topicName, eventKey, event);
- eventFuture.whenComplete((result, e) -> {
- if (e == null) {
- log.debug("Successfully published event to topic : {} , Event : {}",
- result.getRecordMetadata().topic(), result.getProducerRecord().value());
- } else {
- log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
- }
- });
+ handleLegacyEventCallback(topicName, eventFuture);
}
/**
@@ -107,16 +100,8 @@ public class EventsPublisher<T> {
final ProducerRecord<String, T> producerRecord =
new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
- final CompletableFuture<SendResult<String, T>> eventFuture =
- legacyKafkaEventTemplate.send(producerRecord);
- eventFuture.whenComplete((result, ex) -> {
- if (ex != null) {
- log.error("Unable to publish event to topic : {} due to {}", topicName, ex.getMessage());
- } else {
- log.debug("Successfully published event to topic : {} , Event : {}",
- result.getRecordMetadata().topic(), result.getProducerRecord().value());
- }
- });
+ final CompletableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord);
+ handleLegacyEventCallback(topicName, eventFuture);
}
/**
@@ -133,6 +118,18 @@ public class EventsPublisher<T> {
publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
+ private void handleLegacyEventCallback(final String topicName,
+ final CompletableFuture<SendResult<String, T>> eventFuture) {
+ eventFuture.whenComplete((result, e) -> {
+ if (e != null) {
+ log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+ } else {
+ log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+ result.getProducerRecord().value());
+ }
+ });
+ }
+
private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
final Headers eventHeaders = new RecordHeaders();
eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/dmiavailability/DMiPluginWatchDog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/dmiavailability/DMiPluginWatchDog.java
index dac32aa736..d3b95eacbf 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/dmiavailability/DMiPluginWatchDog.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/trustlevel/dmiavailability/DMiPluginWatchDog.java
@@ -46,7 +46,7 @@ public class DMiPluginWatchDog {
*/
@Scheduled(fixedDelayString = "${ncmp.timers.trust-evel.dmi-availability-watchdog-ms:30000}")
public void watchDmiPluginAliveness() {
- trustLevelPerDmiPlugin.keySet().forEach((dmiPluginName) -> {
+ trustLevelPerDmiPlugin.keySet().forEach(dmiPluginName -> {
final DmiPluginStatus dmiPluginStatus = dmiRestClient.getDmiPluginStatus(dmiPluginName);
log.debug("Trust level for dmi-plugin: {} is {}", dmiPluginName, dmiPluginStatus.toString());
if (DmiPluginStatus.UP.equals(dmiPluginStatus)) {