diff options
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java')
-rw-r--r-- | src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java | 245 |
1 files changed, 0 insertions, 245 deletions
diff --git a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java b/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java deleted file mode 100644 index 692f093..0000000 --- a/src/main/java/com/att/nsa/cambria/backends/kafka/KafkaConsumer.java +++ /dev/null @@ -1,245 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.backends.kafka; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; - -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import com.att.nsa.cambria.backends.Consumer; - -/** - * A consumer instance that's created per-request. These are stateless so that - * clients can connect to this service as a proxy. - * - * @author author - * - */ -public class KafkaConsumer implements Consumer { - private enum State { - OPENED, CLOSED - } - - /** - * KafkaConsumer() is constructor. It has following 4 parameters:- - * @param topic - * @param group - * @param id - * @param cc - * - */ - - public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) { - fTopic = topic; - fGroup = group; - fId = id; - fConnector = cc; - - fCreateTimeMs = System.currentTimeMillis(); - fLastTouch = fCreateTimeMs; - - fLogTag = fGroup + "(" + fId + ")/" + fTopic; - offset = 0; - - state = KafkaConsumer.State.OPENED; - - final Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); - topicCountMap.put(fTopic, 1); - final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector - .createMessageStreams(topicCountMap); - final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic); - fStream = streams.iterator().next(); - } - - - /** getName() method returns string type value. - * returns 3 parameters in string:- - * fTopic,fGroup,fId - * @Override - */ - public String getName() { - return fTopic + " : " + fGroup + " : " + fId; - } - - /** getCreateTimeMs() method returns long type value. - * returns fCreateTimeMs variable value - * @Override - * - */ - public long getCreateTimeMs() { - return fCreateTimeMs; - } - - /** getLastAccessMs() method returns long type value. - * returns fLastTouch variable value - * @Override - * - */ - public long getLastAccessMs() { - return fLastTouch; - } - - - /** - * nextMessage() is synchronized method that means at a time only one object can access it. - * getName() method returns String which is of type Consumer.Message - * @Override - * */ - public synchronized Consumer.Message nextMessage() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("nextMessage() called on closed KafkaConsumer " + getName()); - return null; - } - - try { - ConsumerIterator<byte[], byte[]> it = fStream.iterator(); - if (it.hasNext()) { - final MessageAndMetadata<byte[], byte[]> msg = it.next(); - offset = msg.offset(); - - return new Consumer.Message() { - @Override - public long getOffset() { - return msg.offset(); - } - - @Override - public String getMessage() { - return new String(msg.message()); - } - }; - } - } catch (kafka.consumer.ConsumerTimeoutException x) { - log.error(fLogTag + ": ConsumerTimeoutException in Kafka consumer; returning null & Exception at nextMessage() : " + x); - } catch (java.lang.IllegalStateException x) { - log.error(fLogTag + ": Error found next() at : " + x); - } - - return null; - } - - /** getOffset() method returns long type value. - * returns offset variable value - * @Override - * - */ - public long getOffset() { - return offset; - } - - /** commit offsets - * commitOffsets() method will be called on closed of KafkaConsumer. - * @Override - * - */ - public void commitOffsets() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); - return; - } - fConnector.commitOffsets(); - } - - /** - * updating fLastTouch with current time in ms - */ - public void touch() { - fLastTouch = System.currentTimeMillis(); - } - - /** getLastTouch() method returns long type value. - * returns fLastTouch variable value - * - */ - public long getLastTouch() { - return fLastTouch; - } - - /** - * setting the kafkaConsumer state to closed - */ - public synchronized void close() { - if (getState() == KafkaConsumer.State.CLOSED) { - log.warn("close() called on closed KafkaConsumer " + getName()); - return; - } - - setState(KafkaConsumer.State.CLOSED); - fConnector.shutdown(); - } - - /** - * getConsumerGroup() returns Consumer group - * @return - */ - public String getConsumerGroup() { - return fGroup; - } - - /** - * getConsumerId returns Consumer Id - * @return - */ - public String getConsumerId() { - return fId; - } - - /** - * getState returns kafkaconsumer state - * @return - */ - private KafkaConsumer.State getState() { - return this.state; - } - - /** - * setState() sets the kafkaConsumer state - * @param state - */ - private void setState(KafkaConsumer.State state) { - this.state = state; - } - - private ConsumerConnector fConnector; - private final String fTopic; - private final String fGroup; - private final String fId; - private final String fLogTag; - private final KafkaStream<byte[], byte[]> fStream; - private long fCreateTimeMs; - private long fLastTouch; - private long offset; - private KafkaConsumer.State state; - private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); - //private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class); -} |