diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java')
-rw-r--r-- | sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java | 29 |
1 files changed, 13 insertions, 16 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java index 5951ed0..2c69330 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/StatusConsumer.java @@ -20,24 +20,23 @@ package org.onap.sdc.impl; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.onap.sdc.api.consumer.IStatusCallback; import org.onap.sdc.api.notification.IStatusData; +import org.onap.sdc.utils.kafka.SdcKafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - class StatusConsumer implements Runnable { - private static Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); + private static final Logger log = LoggerFactory.getLogger(StatusConsumer.class.getName()); - private CambriaConsumer cambriaConsumer; - private IStatusCallback clientCallback; + private final SdcKafkaConsumer kafkaConsumer; + private final IStatusCallback clientCallback; - StatusConsumer(CambriaConsumer cambriaConsumer, IStatusCallback clientCallback) { - this.cambriaConsumer = cambriaConsumer; + StatusConsumer(SdcKafkaConsumer kafkaConsumer, IStatusCallback clientCallback) { + this.kafkaConsumer = kafkaConsumer; this.clientCallback = clientCallback; } @@ -46,18 +45,16 @@ class StatusConsumer implements Runnable { try { Gson gson = new GsonBuilder().setPrettyPrinting().create(); - for (String statusMsg : cambriaConsumer.fetch()) { + log.info("Polling for messages from topic: {}", kafkaConsumer.getTopicName()); + for (String statusMsg : kafkaConsumer.poll()) { log.debug("received message from topic"); - log.debug("recieved notification from broker: {}", statusMsg); + log.debug("received notification from broker: {}", statusMsg); IStatusData statusData = gson.fromJson(statusMsg, StatusDataImpl.class); clientCallback.activateCallback(statusData); - - } - } catch (Exception e) { - log.error("Error exception occured when fetching with Cambria Client:{}", e.getMessage()); - log.debug("Error exception occured when fetching with Cambria Client:{}", e.getMessage(), e); + log.error("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage()); + log.debug("Error exception occurred when fetching with Kafka Consumer:{}", e.getMessage(), e); } } |