diff options
Diffstat (limited to 'src/main/java')
4 files changed, 32 insertions, 31 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java index 93374fb..b3f3c8f 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -77,7 +77,7 @@ public class Kafka011Consumer implements Consumer { fId = id; fCreateTimeMs = System.currentTimeMillis(); fLastTouch = fCreateTimeMs; - fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>(); + fPendingMsgs = new LinkedBlockingQueue<>(); fLogTag = fGroup + "(" + fId + ")/" + fTopic; offset = 0; state = Kafka011Consumer.State.OPENED; @@ -113,12 +113,13 @@ public class Kafka011Consumer implements Consumer { public synchronized Consumer.Message nextMessage() { try { - if (fPendingMsgs.size() > 0) { + if (fPendingMsgs.isEmpty()) { return makeMessage(fPendingMsgs.take()); } } catch (InterruptedException x) { log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")", x); + Thread.currentThread().interrupt(); } Callable<Boolean> run = new Callable<Boolean>() { @@ -135,11 +136,10 @@ public class Kafka011Consumer implements Consumer { } } catch (KafkaException x) { - log.debug(fLogTag + ": KafkaException " + x.getMessage()); + log.debug(fLogTag + ": KafkaException ", x); } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) { - log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " - + x.getMessage()); + log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x); } @@ -156,25 +156,26 @@ public class Kafka011Consumer implements Consumer { future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1 // second } catch (TimeoutException ex) { + log.error("TimeoutException in in Kafka consumer ", ex); // timed out. Try to stop the code if possible. String apiNodeId = null; try { apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; } catch (UnknownHostException e1) { - // TODO Auto-generated catch block - log.error("unable to get the localhost address"); + log.error("unable to get the localhost address ", e1); } try { if (fKafkaLiveLockAvoider != null) fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup); } catch (Exception e) { - log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup); + log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e); } forcePollOnConsumer(); future.cancel(true); } catch (Exception ex) { + log.error("Exception in in Kafka consumer ", ex); // timed out. Try to stop the code if possible. future.cancel(true); } @@ -307,13 +308,12 @@ public class Kafka011Consumer implements Consumer { // second } catch (TimeoutException ex) { // timed out. Try to stop the code if possible. - log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task"); + log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex); future.cancel(true); setState(Kafka011Consumer.State.OPENED); } catch (Exception ex) { // timed out. Try to stop the code if possible. - log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task" - + ex); + log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex); future.cancel(true); setState(Kafka011Consumer.State.OPENED); return false; diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java index a93ac33..9f8f26d 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java @@ -72,7 +72,7 @@ public class Kafka011ConsumerUtil { } } catch (Exception e) { - log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage()); + log.error("Failed and go to Exception block for " + fGroup +" ", e); } } }); @@ -108,9 +108,9 @@ public class Kafka011ConsumerUtil { } } catch (java.util.ConcurrentModificationException e) { - log.error("Error occurs for " + e); + log.error("Error occurs for ", e); } catch (Exception e) { - log.error("Failed and go to Exception block for " + group + " " + e.getMessage()); + log.error("Failed and go to Exception block for " + group + " ", e); } } }); diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java index a38d77b..04d1d9e 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -320,9 +320,9 @@ public class KafkaConsumerCache { curatorConsumerCache.close(); log.info("Curator client closed"); } catch (ZkInterruptedException e) { - log.warn("Curator client close interrupted: " + e.getMessage()); + log.warn("Curator client close interrupted: ", e); } catch (IOException e) { - log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage()); + log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache ", e); } curatorConsumerCache = null; @@ -497,10 +497,10 @@ public class KafkaConsumerCache { log.info(" ^ deleted " + fBaseZkPath + "/" + key); } catch (NoNodeException e) { log.warn("A consumer was deleted from " + fApiId - + "'s cache, but no Cambria API node had ownership of it in ZooKeeper"); + + "'s cache, but no Cambria API node had ownership of it in ZooKeeper ", e); } catch (Exception e) { - log.debug("Unexpected exception while deleting consumer: " + e.getMessage()); - log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage()); + log.debug("Unexpected exception while deleting consumer: ", e); + log.info(" %%%%%%@# Unexpected exception while deleting consumer: ", e); } try { @@ -648,6 +648,7 @@ public class KafkaConsumerCache { try { curator.setData().forPath(consumerPath, fApiId.getBytes()); } catch (KeeperException.NoNodeException e) { + log.info("KeeperException.NoNodeException occured", e); curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes()); } log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey); diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java index 387b667..2f436ec 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java @@ -637,7 +637,7 @@ public class EventsServiceImpl implements EventsService { // final KeyedMessage<String, String> data = new // KeyedMessage<String, String>(topic, m.getKey(), - // kms.add(data); + final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(), m.getMessage()); @@ -650,7 +650,7 @@ public class EventsServiceImpl implements EventsService { + batchId + "]"); try { // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, - // kms); + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); // transactionLogs(batch); for (message msg : batch) { @@ -681,7 +681,7 @@ public class EventsServiceImpl implements EventsService { metricsSet.publishTick(sizeNow); publishBatchCount = sizeNow; count += sizeNow; - // batchId++; + String endTime = sdf.format(new Date()); LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime @@ -698,9 +698,9 @@ public class EventsServiceImpl implements EventsService { + batchId + "]"); try { // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, - // kms); + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); - // transactionLogs(batch); + for (message msg : batch) { LogDetails logDetails = msg.getLogDetails(); LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); @@ -727,7 +727,7 @@ public class EventsServiceImpl implements EventsService { pms.clear(); metricsSet.publishTick(sizeNow); count += sizeNow; - // batchId++; + String endTime = sdf.format(new Date()); publishBatchCount = sizeNow; LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId @@ -855,12 +855,12 @@ public class EventsServiceImpl implements EventsService { return logDetails; } - /* - * public String getMetricsTopic() { return metricsTopic; } - * - * public void setMetricsTopic(String metricsTopic) { this.metricsTopic = - * metricsTopic; } - */ + + + + + + |