aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2022-06-16 09:38:26 +0100
committerefiacor <fiachra.corcoran@est.tech>2022-10-10 17:40:51 +0100
commitcff56489f774f937654cb6eb198d3d5ef41418a2 (patch)
tree3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
parent1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff)
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details Add kafka config to IConfiguration interface Signed-off-by: efiacor <fiachra.corcoran@est.tech> Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java72
1 files changed, 35 insertions, 37 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
index bf28d97..c59612a 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java
@@ -20,34 +20,32 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import java.util.ArrayList;
import java.util.List;
-
+import org.onap.sdc.api.consumer.INotificationCallback;
import org.onap.sdc.api.notification.IArtifactInfo;
+import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.api.notification.IResourceInstance;
import org.onap.sdc.api.results.IDistributionClientResult;
-import org.onap.sdc.utils.DistributionActionResultEnum;
-import org.onap.sdc.api.consumer.INotificationCallback;
-import org.onap.sdc.api.notification.INotificationData;
import org.onap.sdc.utils.ArtifactTypeEnum;
+import org.onap.sdc.utils.DistributionActionResultEnum;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class NotificationConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private INotificationCallback clientCallback;
- private List<String> artifactsTypes;
- private DistributionClientImpl distributionClient;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final INotificationCallback clientCallback;
+ private final List<String> artifactsTypes;
+ private final DistributionClientImpl distributionClient;
- NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
- this.cambriaConsumer = cambriaConsumer;
+ NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
this.artifactsTypes = artifactsTypes;
this.distributionClient = distributionClient;
@@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable {
@Override
public void run() {
-
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
long currentTimeMillis = System.currentTimeMillis();
- for (String notificationMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String notificationMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", notificationMsg);
+ log.debug("received notification from broker: {}", notificationMsg);
- final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class);
- NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB);
+ final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class);
+ NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus);
if (isActivateCallback(notificationForCallback)) {
String stringNotificationForCallback = gson.toJson(notificationForCallback);
log.debug("sending notification to client: {}", stringNotificationForCallback);
@@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable {
}
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}
@@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable {
return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService;
}
- protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) {
- List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis);
- List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl());
- notificationFromUEB.setResources(relevantResourceInstances);
- notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts);
- return notificationFromUEB;
+ protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) {
+ List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis);
+ List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl());
+ notificationFromMessageBus.setResources(relevantResourceInstances);
+ notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts);
+ return notificationFromMessageBus;
}
- private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) {
+ private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) {
List<IResourceInstance> relevantResourceInstances = new ArrayList<>();
- for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) {
+ for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) {
final List<ArtifactInfoImpl> artifactsImplList = resourceInstance.getArtifactsImpl();
- List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList);
+ List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList);
if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) {
resourceInstance.setArtifacts(foundRelevantArtifacts);
relevantResourceInstances.add(resourceInstance);
@@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable {
}
- private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
+ private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) {
List<ArtifactInfoImpl> relevantArtifacts = new ArrayList<>();
if (artifactsImplList != null) {
for (ArtifactInfoImpl artifactInfo : artifactsImplList) {
- handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
+ handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo);
}
}
return relevantArtifacts;
}
- private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
+ private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) {
boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType());
String artifactType = artifactInfo.getArtifactType();
if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) {
@@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable {
}
}
if (isArtifactRelevant) {
- setRelatedArtifacts(artifactInfo, notificationFromUEB);
+ setRelatedArtifacts(artifactInfo, notificationFromMessageBus);
if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) {
setGeneratedArtifact(artifactsImplList, artifactInfo);
}
relevantArtifacts.add(artifactInfo);
}
- IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant);
+ IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant);
if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
- log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
+ log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult());
}
}