diff options
author | Sumapriya Sarvepalli <ss00493505@techmahindra.com> | 2018-08-17 16:12:52 +0530 |
---|---|---|
committer | Sumapriya Sarvepalli <ss00493505@techmahindra.com> | 2018-08-17 16:12:52 +0530 |
commit | e5f840ed7d6c284fad4834e73715058c4f98ce90 (patch) | |
tree | 238fc3a0fca8198c127409a24035ccadb47a4074 /src/main | |
parent | ad99b28f0481a49a9f58e1ac2d03b47d9cd4e0af (diff) |
Sonar Major issues
This block of commented-out lines of code should be removed
Sonar Link:
https://sonar.onap.org/project/issues?assignees=sumapriya&id=org.onap.dmaap.messagerouter.msgrtr%3Amsgrtr&open=AWU41WkrwGn37JfbyH0E&resolved=false
Location:
src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
Line No:L45 L64 L129 L142 L261 L269 L283 L289 L363 L368 L377 L385 L399 L401
Change-Id: Id747f017cc0f09694181ce410c697895fcfafafa
Issue-ID: DMAAP-610
Signed-off-by: Sumapriya Sarvepalli <ss00493505@techmahindra.com>
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java | 32 |
1 files changed, 13 insertions, 19 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java index f7f5ba7..6cd0230 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -42,8 +42,7 @@ import org.apache.kafka.common.KafkaException; import com.att.dmf.mr.backends.Consumer; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -60,8 +59,7 @@ public class Kafka011Consumer implements Consumer { OPENED, CLOSED } - // @Autowired - // KafkaLiveLockAvoider kafkaLiveLockAvoider; + /** * KafkaConsumer() is constructor. It has following 4 parameters:- * @@ -126,7 +124,7 @@ public class Kafka011Consumer implements Consumer { records = kConsumer.poll(500); } for (ConsumerRecord<String, String> record : records) { - // foundMsgs = true; + fPendingMsgs.offer(record); } @@ -139,7 +137,7 @@ public class Kafka011Consumer implements Consumer { } - // return null; + return true; } }; @@ -258,7 +256,7 @@ public class Kafka011Consumer implements Consumer { /** * setting the kafkaConsumer state to closed */ - // public synchronized boolean close() { + public boolean close() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -266,7 +264,7 @@ public class Kafka011Consumer implements Consumer { return true; } - // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); return retVal; @@ -280,13 +278,13 @@ public class Kafka011Consumer implements Consumer { public Boolean call() throws Exception { try { - // System.out.println("attempt to delete " + kConsumer); + kConsumer.close(); } catch (Exception e) { log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - // return false; + } log.info("Kafka connection closure with in 15 seconds by a Executors task"); @@ -360,12 +358,12 @@ public class Kafka011Consumer implements Consumer { this.state = state; } - // private ConsumerConnector fConnector; + private final String fTopic; private final String fGroup; private final String fId; private final String fLogTag; - // private final KafkaStream<byte[], byte[]> fStream; + private KafkaConsumer<String, String> kConsumer; private long fCreateTimeMs; private long fLastTouch; @@ -374,7 +372,7 @@ public class Kafka011Consumer implements Consumer { private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs; - //private ArrayList<Kafka011Consumer> fconsumerList; + @Override public void commitOffsets() { if (getState() == Kafka011Consumer.State.CLOSED) { @@ -382,7 +380,7 @@ public class Kafka011Consumer implements Consumer { return; } kConsumer.commitSync(); - // fConsumer.close(); + } @@ -395,9 +393,5 @@ public class Kafka011Consumer implements Consumer { public void setConsumerCache(KafkaConsumerCache cache) { } - //@Override - //public Message nextMessage(ArrayList<?> l) { - // TODO Auto-generated method stub - //return null; - //} + } |