aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java29
1 files changed, 13 insertions, 16 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
index 5951ed0..2c69330 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java
@@ -20,24 +20,23 @@
package org.onap.sdc.impl;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import org.onap.sdc.api.consumer.IStatusCallback;
import org.onap.sdc.api.notification.IStatusData;
+import org.onap.sdc.utils.kafka.SdcKafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.att.nsa.cambria.client.CambriaConsumer;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
class StatusConsumer implements Runnable {
- private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
+ private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName());
- private CambriaConsumer cambriaConsumer;
- private IStatusCallback clientCallback;
+ private final SdcKafkaConsumer kafkaConsumer;
+ private final IStatusCallback clientCallback;
- StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) {
- this.cambriaConsumer = cambriaConsumer;
+ StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) {
+ this.kafkaConsumer = kafkaConsumer;
this.clientCallback = clientCallback;
}
@@ -46,18 +45,16 @@ class StatusConsumer implements Runnable {
try {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
- for (String statusMsg : cambriaConsumer.fetch()) {
+ log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName());
+ for (String statusMsg : kafkaConsumer.poll()) {
log.debug("received message from topic");
- log.debug("recieved notification from broker: {}", statusMsg);
+ log.debug("received notification from broker: {}", statusMsg);
IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class);
clientCallback.activateCallback(statusData);
-
-
}
-
} catch (Exception e) {
- log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage());
- log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e);
+ log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage());
+ log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e);
}
}