From e5f840ed7d6c284fad4834e73715058c4f98ce90 Mon Sep 17 00:00:00 2001 From: Sumapriya Sarvepalli Date: Fri, 17 Aug 2018 16:12:52 +0530 Subject: Sonar Major issues This block of commented-out lines of code should be removed Sonar Link: https://sonar.onap.org/project/issues?assignees=sumapriya&id=org.onap.dmaap.messagerouter.msgrtr%3Amsgrtr&open=AWU41WkrwGn37JfbyH0E&resolved=false Location: src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java Line No:L45 L64 L129 L142 L261 L269 L283 L289 L363 L368 L377 L385 L399 L401 Change-Id: Id747f017cc0f09694181ce410c697895fcfafafa Issue-ID: DMAAP-610 Signed-off-by: Sumapriya Sarvepalli --- .../dmf/mr/backends/kafka/Kafka011Consumer.java | 32 +++++++++------------- 1 file changed, 13 insertions(+), 19 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java index f7f5ba7..6cd0230 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException; import com.att.dmf.mr.backends.Consumer; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -60,8 +59,7 @@ public class Kafka011Consumer implements Consumer { OPENED, CLOSED } - // @Autowired - // KafkaLiveLockAvoider kafkaLiveLockAvoider; + /** * KafkaConsumer() is constructor. It has following 4 parameters:- * @@ -126,7 +124,7 @@ public class Kafka011Consumer implements Consumer { records = kConsumer.poll(500); } for (ConsumerRecord record : records) { - // foundMsgs = true; + fPendingMsgs.offer(record); } @@ -139,7 +137,7 @@ public class Kafka011Consumer implements Consumer { } - // return null; + return true; } }; @@ -258,7 +256,7 @@ public class Kafka011Consumer implements Consumer { /** * setting the kafkaConsumer state to closed */ - // public synchronized boolean close() { + public boolean close() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -266,7 +264,7 @@ public class Kafka011Consumer implements Consumer { return true; } - // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); return retVal; @@ -280,13 +278,13 @@ public class Kafka011Consumer implements Consumer { public Boolean call() throws Exception { try { - // System.out.println("attempt to delete " + kConsumer); + kConsumer.close(); } catch (Exception e) { log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - // return false; + } log.info("Kafka connection closure with in 15 seconds by a Executors task"); @@ -360,12 +358,12 @@ public class Kafka011Consumer implements Consumer { this.state = state; } - // private ConsumerConnector fConnector; + private final String fTopic; private final String fGroup; private final String fId; private final String fLogTag; - // private final KafkaStream fStream; + private KafkaConsumer kConsumer; private long fCreateTimeMs; private long fLastTouch; @@ -374,7 +372,7 @@ public class Kafka011Consumer implements Consumer { private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); private final LinkedBlockingQueue> fPendingMsgs; - //private ArrayList fconsumerList; + @Override public void commitOffsets() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -382,7 +380,7 @@ public class Kafka011Consumer implements Consumer { return; } kConsumer.commitSync(); - // fConsumer.close(); + } @@ -395,9 +393,5 @@ public class Kafka011Consumer implements Consumer { public void setConsumerCache(KafkaConsumerCache cache) { } - //@Override - //public Message nextMessage(ArrayList l) { - // TODO Auto-generated method stub - //return null; - //} + } -- cgit 1.2.3-korg