diff options
author | sunil unnava <sunil.unnava@att.com> | 2018-10-23 12:18:59 -0400 |
---|---|---|
committer | sunil unnava <sunil.unnava@att.com> | 2018-10-23 12:22:02 -0400 |
commit | 3504265229c589ecc166e3ad4c33bb198b11e4ce (patch) | |
tree | 022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java | |
parent | 8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff) |
update the package name1.1.11
Issue-ID: DMAAP-858
Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c
Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java | 397 |
1 files changed, 0 insertions, 397 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java deleted file mode 100644 index 9be9073..0000000 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java +++ /dev/null @@ -1,397 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.dmf.mr.backends.kafka; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; - -import com.att.dmf.mr.backends.Consumer; -import com.att.dmf.mr.constants.CambriaConstants; - - - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -/** - * A consumer instance that's created per-request. These are stateless so that - * clients can connect to this service as a proxy. - * - * @author Ram - * - */ -public class Kafka011Consumer implements Consumer { - private enum State { - OPENED, CLOSED - } - - - /** - * KafkaConsumer() is constructor. It has following 4 parameters:- - * - * @param topic - * @param group - * @param id - * @param cc - * - */ - - public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc, - KafkaLiveLockAvoider2 klla) throws Exception { - fTopic = topic; - fGroup = group; - fId = id; - fCreateTimeMs = System.currentTimeMillis(); - fLastTouch = fCreateTimeMs; - fPendingMsgs = new LinkedBlockingQueue<ConsumerRecord<String, String>>(); - fLogTag = fGroup + "(" + fId + ")/" + fTopic; - offset = 0; - state = Kafka011Consumer.State.OPENED; - kConsumer = cc; - fKafkaLiveLockAvoider = klla; - synchronized (kConsumer) { - kConsumer.subscribe(Arrays.asList(topic)); - } - } - - private Consumer.Message makeMessage(final ConsumerRecord<String, String> msg) { - return new Consumer.Message() { - @Override - public long getOffset() { - offset = msg.offset(); - return offset; - } - - @Override - public String getMessage() { - return new String(msg.value()); - } - }; - } - - @Override - public synchronized Consumer.Message nextMessage() { - - try { - if (fPendingMsgs.size() > 0) { - return makeMessage(fPendingMsgs.take()); - } - } catch (InterruptedException x) { - log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")", - x); - } - - Callable<Boolean> run = new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - try { - ConsumerRecords<String, String> records; - synchronized (kConsumer) { - records = kConsumer.poll(500); - } - for (ConsumerRecord<String, String> record : records) { - - fPendingMsgs.offer(record); - } - - } catch (KafkaException x) { - log.debug(fLogTag + ": KafkaException " + x.getMessage()); - - } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) { - log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " - + x.getMessage()); - - } - - - return true; - } - }; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - RunnableFuture future = new FutureTask(run); - ExecutorService service = Executors.newSingleThreadExecutor(); - service.execute(future); - try { - future.get(5, TimeUnit.SECONDS); // wait 1 - // second - } catch (TimeoutException ex) { - // timed out. Try to stop the code if possible. - String apiNodeId = null; - try { - apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; - } catch (UnknownHostException e1) { - // TODO Auto-generated catch block - log.error("unable to get the localhost address"); - } - - try { - if (fKafkaLiveLockAvoider != null) - fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup); - } catch (Exception e) { - log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup); - } - - forcePollOnConsumer(); - future.cancel(true); - } catch (Exception ex) { - // timed out. Try to stop the code if possible. - future.cancel(true); - } - service.shutdown(); - - return null; - - } - - /** - * getName() method returns string type value. returns 3 parameters in - * string:- fTopic,fGroup,fId - * - * @Override - */ - public String getName() { - return fTopic + " : " + fGroup + " : " + fId; - } - - /** - * getCreateTimeMs() method returns long type value. returns fCreateTimeMs - * variable value - * - * @Override - * - */ - public long getCreateTimeMs() { - return fCreateTimeMs; - } - - public org.apache.kafka.clients.consumer.KafkaConsumer<String, String> getConsumer() { - return kConsumer; - } - - /** - * getLastAccessMs() method returns long type value. returns fLastTouch - * variable value - * - * @Override - * - */ - public long getLastAccessMs() { - return fLastTouch; - } - - /** - * getOffset() method returns long type value. returns offset variable value - * - * @Override - * - */ - public long getOffset() { - return offset; - } - - /** - * commit offsets commitOffsets() method will be called on closed of - * KafkaConsumer. - * - * @Override - * - * - * public void commitOffsets() { if (getState() == - * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called - * on closed KafkaConsumer " + getName()); return; } - * fConnector.commitOffsets(); } - */ - - /** - * updating fLastTouch with current time in ms - */ - public void touch() { - fLastTouch = System.currentTimeMillis(); - } - - /** - * getLastTouch() method returns long type value. returns fLastTouch - * variable value - * - */ - public long getLastTouch() { - return fLastTouch; - } - - /** - * setting the kafkaConsumer state to closed - */ - - public boolean close() { - if (getState() == Kafka011Consumer.State.CLOSED) { - - log.error("close() called on closed KafkaConsumer " + getName()); - return true; - } - - - boolean retVal = kafkaConnectorshuttask(); - return retVal; - - } - - /* time out if the kafka shutdown fails for some reason */ - - private boolean kafkaConnectorshuttask() { - Callable<Boolean> run = new Callable<Boolean>() { - @Override - public Boolean call() throws Exception { - - try { - - kConsumer.close(); - - } catch (Exception e) { - log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); - - } - log.info("Kafka connection closure with in 15 seconds by a Executors task"); - - return true; - } - }; - - @SuppressWarnings({ "rawtypes", "unchecked" }) - RunnableFuture future = new FutureTask(run); - ExecutorService service = Executors.newSingleThreadExecutor(); - service.execute(future); - try { - future.get(200, TimeUnit.SECONDS); // wait 1 - // second - } catch (TimeoutException ex) { - // timed out. Try to stop the code if possible. - log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task"); - future.cancel(true); - setState(Kafka011Consumer.State.OPENED); - } catch (Exception ex) { - // timed out. Try to stop the code if possible. - log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task" - + ex); - future.cancel(true); - setState(Kafka011Consumer.State.OPENED); - return false; - } - service.shutdown(); - setState(Kafka011Consumer.State.CLOSED); - return true; - } - - public void forcePollOnConsumer() { - Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId); - - } - - /** - * getConsumerGroup() returns Consumer group - * - * @return - */ - public String getConsumerGroup() { - return fGroup; - } - - /** - * getConsumerId returns Consumer Id - * - * @return - */ - public String getConsumerId() { - return fId; - } - - /** - * getState returns kafkaconsumer state - * - * @return - */ - private Kafka011Consumer.State getState() { - return this.state; - } - - /** - * setState() sets the kafkaConsumer state - * - * @param state - */ - private void setState(Kafka011Consumer.State state) { - this.state = state; - } - - - private final String fTopic; - private final String fGroup; - private final String fId; - private final String fLogTag; - - private KafkaConsumer<String, String> kConsumer; - private long fCreateTimeMs; - private long fLastTouch; - private long offset; - private Kafka011Consumer.State state; - private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; - private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); - private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs; - - @Override - public void commitOffsets() { - if (getState() == Kafka011Consumer.State.CLOSED) { - log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); - return; - } - kConsumer.commitSync(); - - - } - - @Override - public void setOffset(long offsetval) { - offset = offsetval; - } - - - public void setConsumerCache(KafkaConsumerCache cache) { - } - - -} |