diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java new file mode 100644 index 0000000..4d6c81c --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -0,0 +1,394 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.dmf.mr.backends.kafka; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.onap.dmaap.dmf.mr.backends.Consumer; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.concurrent.*; + +/** + * A consumer instance that's created per-request. These are stateless so that + * clients can connect to this service as a proxy. + * + * @author Ram + * + */ +public class Kafka011Consumer implements Consumer { + private enum State { + OPENED, CLOSED + } + + + /** + * KafkaConsumer() is constructor. It has following 4 parameters:- + * + * @param topic + * @param group + * @param id + * @param cc + * + */ + + public Kafka011Consumer(String topic, String group, String id, KafkaConsumer<String, String> cc, + KafkaLiveLockAvoider2 klla) throws Exception { + fTopic = topic; + fGroup = group; + fId = id; + fCreateTimeMs = System.currentTimeMillis(); + fLastTouch = fCreateTimeMs; + fPendingMsgs = new LinkedBlockingQueue<>(); + fLogTag = fGroup + "(" + fId + ")/" + fTopic; + offset = 0; + state = State.OPENED; + kConsumer = cc; + fKafkaLiveLockAvoider = klla; + + String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "consumer.timeout"); + if (StringUtils.isNotEmpty(consumerTimeOut)) { + consumerPollTimeOut = Integer.parseInt(consumerTimeOut); + } + synchronized (kConsumer) { + kConsumer.subscribe(Arrays.asList(topic)); + } + } + + private Message makeMessage(final ConsumerRecord<String, String> msg) { + return new Message() { + @Override + public long getOffset() { + offset = msg.offset(); + return offset; + } + + @Override + public String getMessage() { + return new String(msg.value()); + } + }; + } + + @Override + public synchronized Message nextMessage() { + + try { + if (!fPendingMsgs.isEmpty()) { + return makeMessage(fPendingMsgs.take()); + } + } catch (InterruptedException x) { + log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")", + x); + Thread.currentThread().interrupt(); + } + + Callable<Boolean> run = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + try { + ConsumerRecords<String, String> records; + synchronized (kConsumer) { + records = kConsumer.poll(500); + } + for (ConsumerRecord<String, String> record : records) { + + fPendingMsgs.offer(record); + } + + } catch (KafkaException x) { + log.debug(fLogTag + ": KafkaException ", x); + + } catch (IllegalStateException | IllegalArgumentException x) { + log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. ", x); + + } + + + return true; + } + }; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + try { + future.get(consumerPollTimeOut, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + log.error("TimeoutException in in Kafka consumer ", ex); + // timed out. Try to stop the code if possible. + String apiNodeId = null; + try { + apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; + } catch (UnknownHostException e1) { + log.error("unable to get the localhost address ", e1); + } + + try { + if (fKafkaLiveLockAvoider != null) + fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup); + } catch (Exception e) { + log.error("Exception in unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup, e); + } + + forcePollOnConsumer(); + future.cancel(true); + } catch (Exception ex) { + log.error("Exception in in Kafka consumer ", ex); + // timed out. Try to stop the code if possible. + future.cancel(true); + } + service.shutdown(); + + return null; + + } + + /** + * getName() method returns string type value. returns 3 parameters in + * string:- fTopic,fGroup,fId + * + * @Override + */ + public String getName() { + return fTopic + " : " + fGroup + " : " + fId; + } + + /** + * getCreateTimeMs() method returns long type value. returns fCreateTimeMs + * variable value + * + * @Override + * + */ + public long getCreateTimeMs() { + return fCreateTimeMs; + } + + public KafkaConsumer<String, String> getConsumer() { + return kConsumer; + } + + /** + * getLastAccessMs() method returns long type value. returns fLastTouch + * variable value + * + * @Override + * + */ + public long getLastAccessMs() { + return fLastTouch; + } + + /** + * getOffset() method returns long type value. returns offset variable value + * + * @Override + * + */ + public long getOffset() { + return offset; + } + + /** + * commit offsets commitOffsets() method will be called on closed of + * KafkaConsumer. + * + * @Override + * + * + * public void commitOffsets() { if (getState() == + * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called + * on closed KafkaConsumer " + getName()); return; } + * fConnector.commitOffsets(); } + */ + + /** + * updating fLastTouch with current time in ms + */ + public void touch() { + fLastTouch = System.currentTimeMillis(); + } + + /** + * getLastTouch() method returns long type value. returns fLastTouch + * variable value + * + */ + public long getLastTouch() { + return fLastTouch; + } + + /** + * setting the kafkaConsumer state to closed + */ + + public boolean close() { + if (getState() == State.CLOSED) { + + log.error("close() called on closed KafkaConsumer " + getName()); + return true; + } + + + boolean retVal = kafkaConnectorshuttask(); + return retVal; + + } + + /* time out if the kafka shutdown fails for some reason */ + + private boolean kafkaConnectorshuttask() { + Callable<Boolean> run = new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + + try { + + kConsumer.close(); + + } catch (Exception e) { + log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + + } + log.info("Kafka connection closure with in 15 seconds by a Executors task"); + + return true; + } + }; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + try { + future.get(200, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex); + future.cancel(true); + setState(State.OPENED); + } catch (Exception ex) { + // timed out. Try to stop the code if possible. + log.error("Exception Occured - Kafka connection closure with in 300 seconds by a Executors task ", ex); + future.cancel(true); + setState(State.OPENED); + return false; + } + service.shutdown(); + setState(State.CLOSED); + return true; + } + + public void forcePollOnConsumer() { + Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId); + + } + + /** + * getConsumerGroup() returns Consumer group + * + * @return + */ + public String getConsumerGroup() { + return fGroup; + } + + /** + * getConsumerId returns Consumer Id + * + * @return + */ + public String getConsumerId() { + return fId; + } + + /** + * getState returns kafkaconsumer state + * + * @return + */ + private State getState() { + return this.state; + } + + /** + * setState() sets the kafkaConsumer state + * + * @param state + */ + private void setState(State state) { + this.state = state; + } + + + private final String fTopic; + private final String fGroup; + private final String fId; + private final String fLogTag; + + private KafkaConsumer<String, String> kConsumer; + private long fCreateTimeMs; + private long fLastTouch; + private long offset; + private State state; + private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; + private int consumerPollTimeOut=5; + private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); + private final LinkedBlockingQueue<ConsumerRecord<String, String>> fPendingMsgs; + + @Override + public void commitOffsets() { + if (getState() == State.CLOSED) { + log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); + return; + } + kConsumer.commitSync(); + + + } + + @Override + public void setOffset(long offsetval) { + offset = offsetval; + } + + + public void setConsumerCache(KafkaConsumerCache cache) { + } + + +} |