diff options
author | efiacor <fiachra.corcoran@est.tech> | 2022-06-16 09:38:26 +0100 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2022-10-10 17:40:51 +0100 |
commit | cff56489f774f937654cb6eb198d3d5ef41418a2 (patch) | |
tree | 3819828c2fed7d46536253ff2f35bcf0a3c9c031 /sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java | |
parent | 1b46a6e1d6fcf9788c9f18552f6f6b8fed60126c (diff) |
[STRIMZI] Migrate client from cambria to kafka native
Add call to sdc to get kafka and topic details
Add kafka config to IConfiguration interface
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Ibec77d1ff1cd25ad4adce133ee81d66e54c7707f
Issue-ID: DMAAP-1745
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java | 72 |
1 files changed, 35 insertions, 37 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java index bf28d97..c59612a 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/NotificationConsumer.java @@ -20,34 +20,32 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import java.util.ArrayList; import java.util.List; - +import org.onap.sdc.api.consumer.INotificationCallback; import org.onap.sdc.api.notification.IArtifactInfo; +import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.api.notification.IResourceInstance; import org.onap.sdc.api.results.IDistributionClientResult; -import org.onap.sdc.utils.DistributionActionResultEnum; -import org.onap.sdc.api.consumer.INotificationCallback; -import org.onap.sdc.api.notification.INotificationData; import org.onap.sdc.utils.ArtifactTypeEnum; +import org.onap.sdc.utils.DistributionActionResultEnum; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class NotificationConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(NotificationConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private INotificationCallback clientCallback; - private List<String> artifactsTypes; - private DistributionClientImpl distributionClient; + private final SdcKafkaConsumer kafkaConsumer; + private final INotificationCallback clientCallback; + private final List<String> artifactsTypes; + private final DistributionClientImpl distributionClient; - NotificationConsumer(CambriaConsumer cambriaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) { - this.cambriaConsumer = cambriaConsumer; + NotificationConsumer(SdcKafkaConsumer kafkaConsumer, INotificationCallback clientCallback, List<String> artifactsTypes, DistributionClientImpl distributionClient) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; this.artifactsTypes = artifactsTypes; this.distributionClient = distributionClient; @@ -55,16 +53,16 @@ class NotificationConsumer implements Runnable { @Override public void run() { - try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); long currentTimeMillis = System.currentTimeMillis(); - for (String notificationMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String notificationMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", notificationMsg); + log.debug("received notification from broker: {}", notificationMsg); - final NotificationDataImpl notificationFromUEB = gson.fromJson(notificationMsg, NotificationDataImpl.class); - NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromUEB); + final NotificationDataImpl notificationFromMessageBus = gson.fromJson(notificationMsg, NotificationDataImpl.class); + NotificationDataImpl notificationForCallback = buildCallbackNotificationLogic(currentTimeMillis, notificationFromMessageBus); if (isActivateCallback(notificationForCallback)) { String stringNotificationForCallback = gson.toJson(notificationForCallback); log.debug("sending notification to client: {}", stringNotificationForCallback); @@ -73,8 +71,8 @@ class NotificationConsumer implements Runnable { } } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } @@ -85,21 +83,21 @@ class NotificationConsumer implements Runnable { return hasRelevantArtifactsInResourceInstance || hasRelevantArtifactsInService; } - protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromUEB) { - List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromUEB, currentTimeMillis); - List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, notificationFromUEB.getServiceArtifactsImpl()); - notificationFromUEB.setResources(relevantResourceInstances); - notificationFromUEB.setServiceArtifacts(relevantServiceArtifacts); - return notificationFromUEB; + protected NotificationDataImpl buildCallbackNotificationLogic(long currentTimeMillis, final NotificationDataImpl notificationFromMessageBus) { + List<IResourceInstance> relevantResourceInstances = buildResourceInstancesLogic(notificationFromMessageBus, currentTimeMillis); + List<ArtifactInfoImpl> relevantServiceArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, notificationFromMessageBus.getServiceArtifactsImpl()); + notificationFromMessageBus.setResources(relevantResourceInstances); + notificationFromMessageBus.setServiceArtifacts(relevantServiceArtifacts); + return notificationFromMessageBus; } - private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromUEB, long currentTimeMillis) { + private List<IResourceInstance> buildResourceInstancesLogic(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis) { List<IResourceInstance> relevantResourceInstances = new ArrayList<>(); - for (JsonContainerResourceInstance resourceInstance : notificationFromUEB.getResourcesImpl()) { + for (JsonContainerResourceInstance resourceInstance : notificationFromMessageBus.getResourcesImpl()) { final List<ArtifactInfoImpl> artifactsImplList = resourceInstance.getArtifactsImpl(); - List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromUEB, currentTimeMillis, artifactsImplList); + List<ArtifactInfoImpl> foundRelevantArtifacts = handleRelevantArtifacts(notificationFromMessageBus, currentTimeMillis, artifactsImplList); if (!foundRelevantArtifacts.isEmpty() || distributionClient.getConfiguration().isFilterInEmptyResources()) { resourceInstance.setArtifacts(foundRelevantArtifacts); relevantResourceInstances.add(resourceInstance); @@ -109,17 +107,17 @@ class NotificationConsumer implements Runnable { } - private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) { + private List<ArtifactInfoImpl> handleRelevantArtifacts(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList) { List<ArtifactInfoImpl> relevantArtifacts = new ArrayList<>(); if (artifactsImplList != null) { for (ArtifactInfoImpl artifactInfo : artifactsImplList) { - handleRelevantArtifact(notificationFromUEB, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); + handleRelevantArtifact(notificationFromMessageBus, currentTimeMillis, artifactsImplList, relevantArtifacts, artifactInfo); } } return relevantArtifacts; } - private void handleRelevantArtifact(NotificationDataImpl notificationFromUEB, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) { + private void handleRelevantArtifact(NotificationDataImpl notificationFromMessageBus, long currentTimeMillis, final List<ArtifactInfoImpl> artifactsImplList, List<ArtifactInfoImpl> relevantArtifacts, ArtifactInfoImpl artifactInfo) { boolean isArtifactRelevant = artifactsTypes.contains(artifactInfo.getArtifactType()); String artifactType = artifactInfo.getArtifactType(); if (artifactInfo.getGeneratedFromUUID() != null && !artifactInfo.getGeneratedFromUUID().isEmpty()) { @@ -131,16 +129,16 @@ class NotificationConsumer implements Runnable { } } if (isArtifactRelevant) { - setRelatedArtifacts(artifactInfo, notificationFromUEB); + setRelatedArtifacts(artifactInfo, notificationFromMessageBus); if (artifactType.equals(ArtifactTypeEnum.HEAT.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_VOL.name()) || artifactType.equals(ArtifactTypeEnum.HEAT_NET.name())) { setGeneratedArtifact(artifactsImplList, artifactInfo); } relevantArtifacts.add(artifactInfo); } - IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromUEB.getDistributionID(), artifactInfo, isArtifactRelevant); + IDistributionClientResult notificationStatus = distributionClient.sendNotificationStatus(currentTimeMillis, notificationFromMessageBus.getDistributionID(), artifactInfo, isArtifactRelevant); if (notificationStatus.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { - log.error("Error failed to send notification status to UEB failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); + log.error("Error failed to send notification status to MessageBus failed status:{}, error message:{}", notificationStatus.getDistributionActionResult().name(), notificationStatus.getDistributionMessageResult()); } } |