From b32effcaf5684d5e2f338a4537b71a2375c534e5 Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 14 Aug 2018 09:34:46 -0400 Subject: update the testcases after the kafka 11 changes Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava --- .../java/com/att/dmf/mr/backends/Consumer.java | 106 +++ .../com/att/dmf/mr/backends/ConsumerFactory.java | 119 ++++ .../java/com/att/dmf/mr/backends/MetricsSet.java | 71 ++ .../java/com/att/dmf/mr/backends/Publisher.java | 101 +++ .../dmf/mr/backends/kafka/Kafka011Consumer.java | 403 +++++++++++ .../mr/backends/kafka/Kafka011ConsumerUtil.java | 126 ++++ .../att/dmf/mr/backends/kafka/KafkaConsumer.txt | 386 +++++++++++ .../dmf/mr/backends/kafka/KafkaConsumerCache.java | 749 +++++++++++++++++++++ .../mr/backends/kafka/KafkaLiveLockAvoider2.java | 161 +++++ .../att/dmf/mr/backends/kafka/KafkaPublisher.java | 235 +++++++ .../dmf/mr/backends/kafka/LiveLockAvoidance.java | 45 ++ .../mr/backends/kafka/LockInstructionWatcher.java | 100 +++ .../mr/backends/memory/MemoryConsumerFactory.java | 182 +++++ .../dmf/mr/backends/memory/MemoryMetaBroker.java | 198 ++++++ .../att/dmf/mr/backends/memory/MemoryQueue.java | 207 ++++++ .../mr/backends/memory/MemoryQueuePublisher.java | 92 +++ .../att/dmf/mr/backends/memory/MessageLogger.java | 109 +++ 17 files changed, 3390 insertions(+) create mode 100644 src/main/java/com/att/dmf/mr/backends/Consumer.java create mode 100644 src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java create mode 100644 src/main/java/com/att/dmf/mr/backends/MetricsSet.java create mode 100644 src/main/java/com/att/dmf/mr/backends/Publisher.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java create mode 100644 src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java create mode 100644 src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java create mode 100644 src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java create mode 100644 src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java create mode 100644 src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java create mode 100644 src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java (limited to 'src/main/java/com/att/dmf/mr/backends') diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java new file mode 100644 index 0000000..279d48b --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java @@ -0,0 +1,106 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends; + +import java.util.ArrayList; + +/** + * A consumer interface. Consumers pull the next message from a given topic. + * @author peter + */ +public interface Consumer +{ + /** + * A message interface provide the offset and message + * @author nilanjana.maity + * + */ + public interface Message + { + /** + * returning the offset of that particular message + * @return long + */ + long getOffset (); + /** + * returning the message + * @return message + */ + String getMessage (); + } + + /** + * Get this consumer's name + * @return name + */ + String getName (); + + /** + * Get creation time in ms + * @return + */ + long getCreateTimeMs (); + + /** + * Get last access time in ms + * @return + */ + long getLastAccessMs (); + + /** + * Get the next message from this source. This method must not block. + * @return the next message, or null if none are waiting + */ + Message nextMessage (); + + /** + * Get the next message from this source. This method must not block. + * @param atOffset start with the next message at or after atOffset. -1 means next from last request + * @return the next message, or null if none are waiting + */ +// Message nextMessage ( long atOffset ); + + //Message nextMessage (ArrayList cl); + /** + * Close/clean up this consumer + * @return + */ + boolean close(); + + /** + * Commit the offset of the last consumed message + * + */ + void commitOffsets(); + + /** + * Get the offset this consumer is currently at + * @return offset + */ + long getOffset(); + + void setOffset(long offset); + + //public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer(); + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java new file mode 100644 index 0000000..0b684fe --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/ConsumerFactory.java @@ -0,0 +1,119 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends; + +import java.util.Collection; +import java.util.HashMap; + +import com.att.dmf.mr.CambriaApiException; + +/** + * This is the factory class to instantiate the consumer + * + * @author nilanjana.maity + * + */ + +public interface ConsumerFactory { + public static final String kSetting_EnableCache = "cambria.consumer.cache.enabled"; + public static boolean kDefault_IsCacheEnabled = true; + + /** + * User defined exception for Unavailable Exception + * + * @author nilanjana.maity + * + */ + public class UnavailableException extends Exception { + /** + * Unavailable Exception with message + * + * @param msg + */ + public UnavailableException(String msg) { + super(msg); + } + + /** + * Unavailable Exception with the throwable object + * + * @param t + */ + public UnavailableException(Throwable t) { + super(t); + } + + /** + * Unavailable Exception with the message and cause + * + * @param msg + * @param cause + */ + public UnavailableException(String msg, Throwable cause) { + super(msg, cause); + } + + private static final long serialVersionUID = 1L; + } + + /** + * For admin use, drop all cached consumers. + */ + public void dropCache(); + + /** + * Get or create a consumer for the given set of info (topic, group, id) + * + * @param topic + * @param consumerGroupId + * @param clientId + * @param timeoutMs + * @return + * @throws UnavailableException + */ + //public Consumer getConsumerFor(String topic, String consumerGroupId, + // String clientId, int timeoutMs) throws UnavailableException; + + /** + * For factories that employ a caching mechanism, this allows callers to + * explicitly destory a consumer that resides in the factory's cache. + * + * @param topic + * @param consumerGroupId + * @param clientId + */ + public void destroyConsumer(String topic, String consumerGroupId, + String clientId); + + /** + * For admin/debug, we provide access to the consumers + * + * @return a collection of consumers + */ + public Collection getConsumers(); + + public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, String remotehost) throws UnavailableException, CambriaApiException; + public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, String remotehost) throws UnavailableException, CambriaApiException; + + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/MetricsSet.java b/src/main/java/com/att/dmf/mr/backends/MetricsSet.java new file mode 100644 index 0000000..de665b8 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/MetricsSet.java @@ -0,0 +1,71 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends; + +import com.att.nsa.metrics.CdmMetricsRegistry; +/** + * This interface will help to generate metrics + * @author nilanjana.maity + * + */ +public interface MetricsSet extends CdmMetricsRegistry{ + + /** + * This method will setup cambria sender code + */ + public void setupCambriaSender (); + /** + * This method will define on route complete + * @param name + * @param durationMs + */ + public void onRouteComplete ( String name, long durationMs ); + /** + * This method will help the kafka publisher while publishing the messages + * @param amount + */ + public void publishTick ( int amount ); + /** + * This method will help the kafka consumer while consuming the messages + * @param amount + */ + public void consumeTick ( int amount ); + /** + * This method will call if the kafka consumer cache missed + */ + public void onKafkaConsumerCacheMiss (); + /** + * This method will call if the kafka consumer cache will be hit while publishing/consuming the messages + */ + public void onKafkaConsumerCacheHit (); + /** + * This method will call if the kafka consumer cache claimed + */ + public void onKafkaConsumerClaimed (); + /** + * This method will call if Kafka consumer is timed out + */ + public void onKafkaConsumerTimeout (); + + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/Publisher.java b/src/main/java/com/att/dmf/mr/backends/Publisher.java new file mode 100644 index 0000000..8ff6ce9 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/Publisher.java @@ -0,0 +1,101 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import com.att.dmf.mr.beans.LogDetails; + +//import kafka.producer.KeyedMessage; +/** + * A publisher interface. Publishers receive messages and post them to a topic. + * @author peter + */ +public interface Publisher +{ + /** + * A message interface. The message has a key and a body. + * @author peter + */ + public interface message + { + /** + * Get the key for this message. The key is used to partition messages + * into "sub-streams" that have guaranteed order. The key can be null, + * which means the message can be processed without any concern for order. + * + * @return a key, possibly null + */ + String getKey(); + + /** + * Get the message body. + * @return a message body + */ + String getMessage(); + /** + * set the logging params for transaction enabled logging + * @param logDetails + */ + void setLogDetails (LogDetails logDetails); + /** + * Get the log details for transaction enabled logging + * @return LogDetails + */ + LogDetails getLogDetails (); + + /** + * boolean transactionEnabled + * @return true/false + */ + boolean isTransactionEnabled(); + /** + * Set the transaction enabled flag from prop file or topic based implementation + * @param transactionEnabled + */ + void setTransactionEnabled(boolean transactionEnabled); + } + + /** + * Send a single message to a topic. Equivalent to sendMessages with a list of size 1. + * @param topic + * @param msg + * @throws IOException + */ + public void sendMessage ( String topic, message msg ) throws IOException; + + /** + * Send messages to a topic. + * @param topic + * @param msgs + * @throws IOException + */ + public void sendMessages ( String topic, List msgs ) throws IOException; + + //public void sendBatchMessage(String topic ,ArrayList> kms) throws IOException; + public void sendBatchMessageNew(String topic ,ArrayList> kms) throws IOException; + public void sendMessagesNew( String topic, List msgs ) throws IOException; +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java new file mode 100644 index 0000000..f7f5ba7 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -0,0 +1,403 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; + +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.constants.CambriaConstants; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * A consumer instance that's created per-request. These are stateless so that + * clients can connect to this service as a proxy. + * + * @author Ram + * + */ +public class Kafka011Consumer implements Consumer { + private enum State { + OPENED, CLOSED + } + + // @Autowired + // KafkaLiveLockAvoider kafkaLiveLockAvoider; + /** + * KafkaConsumer() is constructor. It has following 4 parameters:- + * + * @param topic + * @param group + * @param id + * @param cc + * + */ + + public Kafka011Consumer(String topic, String group, String id, KafkaConsumer cc, + KafkaLiveLockAvoider2 klla) throws Exception { + fTopic = topic; + fGroup = group; + fId = id; + fCreateTimeMs = System.currentTimeMillis(); + fLastTouch = fCreateTimeMs; + fPendingMsgs = new LinkedBlockingQueue>(); + fLogTag = fGroup + "(" + fId + ")/" + fTopic; + offset = 0; + state = Kafka011Consumer.State.OPENED; + kConsumer = cc; + fKafkaLiveLockAvoider = klla; + synchronized (kConsumer) { + kConsumer.subscribe(Arrays.asList(topic)); + } + } + + private Consumer.Message makeMessage(final ConsumerRecord msg) { + return new Consumer.Message() { + @Override + public long getOffset() { + offset = msg.offset(); + return offset; + } + + @Override + public String getMessage() { + return new String(msg.value()); + } + }; + } + + @Override + public synchronized Consumer.Message nextMessage() { + + try { + if (fPendingMsgs.size() > 0) { + return makeMessage(fPendingMsgs.take()); + } + } catch (InterruptedException x) { + log.warn("After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage() + ")", + x); + } + + Callable run = new Callable() { + @Override + public Boolean call() throws Exception { + try { + ConsumerRecords records; + synchronized (kConsumer) { + records = kConsumer.poll(500); + } + for (ConsumerRecord record : records) { + // foundMsgs = true; + fPendingMsgs.offer(record); + } + + } catch (KafkaException x) { + log.debug(fLogTag + ": KafkaException " + x.getMessage()); + + } catch (java.lang.IllegalStateException | java.lang.IllegalArgumentException x) { + log.error(fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + + x.getMessage()); + + } + + // return null; + return true; + } + }; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + try { + future.get(5, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + // timed out. Try to stop the code if possible. + String apiNodeId = null; + try { + apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; + } catch (UnknownHostException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } + + try { + if (fKafkaLiveLockAvoider != null) + fKafkaLiveLockAvoider.unlockConsumerGroup(apiNodeId, fTopic + "::" + fGroup); + } catch (Exception e) { + log.error("unlockConsumerGroup(" + apiNodeId + "," + fTopic + "::" + fGroup); + } + + forcePollOnConsumer(); + future.cancel(true); + } catch (Exception ex) { + // timed out. Try to stop the code if possible. + future.cancel(true); + } + service.shutdown(); + + return null; + + } + + /** + * getName() method returns string type value. returns 3 parameters in + * string:- fTopic,fGroup,fId + * + * @Override + */ + public String getName() { + return fTopic + " : " + fGroup + " : " + fId; + } + + /** + * getCreateTimeMs() method returns long type value. returns fCreateTimeMs + * variable value + * + * @Override + * + */ + public long getCreateTimeMs() { + return fCreateTimeMs; + } + + public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { + return kConsumer; + } + + /** + * getLastAccessMs() method returns long type value. returns fLastTouch + * variable value + * + * @Override + * + */ + public long getLastAccessMs() { + return fLastTouch; + } + + /** + * getOffset() method returns long type value. returns offset variable value + * + * @Override + * + */ + public long getOffset() { + return offset; + } + + /** + * commit offsets commitOffsets() method will be called on closed of + * KafkaConsumer. + * + * @Override + * + * + * public void commitOffsets() { if (getState() == + * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called + * on closed KafkaConsumer " + getName()); return; } + * fConnector.commitOffsets(); } + */ + + /** + * updating fLastTouch with current time in ms + */ + public void touch() { + fLastTouch = System.currentTimeMillis(); + } + + /** + * getLastTouch() method returns long type value. returns fLastTouch + * variable value + * + */ + public long getLastTouch() { + return fLastTouch; + } + + /** + * setting the kafkaConsumer state to closed + */ + // public synchronized boolean close() { + public boolean close() { + if (getState() == Kafka011Consumer.State.CLOSED) { + + log.error("close() called on closed KafkaConsumer " + getName()); + return true; + } + + // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); + return retVal; + + } + + /* time out if the kafka shutdown fails for some reason */ + + private boolean kafkaConnectorshuttask() { + Callable run = new Callable() { + @Override + public Boolean call() throws Exception { + + try { + // System.out.println("attempt to delete " + kConsumer); + kConsumer.close(); + + } catch (Exception e) { + log.info("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + throw new Exception("@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + // return false; + } + log.info("Kafka connection closure with in 15 seconds by a Executors task"); + + return true; + } + }; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + try { + future.get(200, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 300 seconds by a Executors task"); + future.cancel(true); + setState(Kafka011Consumer.State.OPENED); + } catch (Exception ex) { + // timed out. Try to stop the code if possible. + log.error("Exception occured Occured - Kafka connection closure with in 300 seconds by a Executors task" + + ex); + future.cancel(true); + setState(Kafka011Consumer.State.OPENED); + return false; + } + service.shutdown(); + setState(Kafka011Consumer.State.CLOSED); + return true; + } + + public void forcePollOnConsumer() { + Kafka011ConsumerUtil.forcePollOnConsumer(fTopic, fGroup, fId); + + } + + /** + * getConsumerGroup() returns Consumer group + * + * @return + */ + public String getConsumerGroup() { + return fGroup; + } + + /** + * getConsumerId returns Consumer Id + * + * @return + */ + public String getConsumerId() { + return fId; + } + + /** + * getState returns kafkaconsumer state + * + * @return + */ + private Kafka011Consumer.State getState() { + return this.state; + } + + /** + * setState() sets the kafkaConsumer state + * + * @param state + */ + private void setState(Kafka011Consumer.State state) { + this.state = state; + } + + // private ConsumerConnector fConnector; + private final String fTopic; + private final String fGroup; + private final String fId; + private final String fLogTag; + // private final KafkaStream fStream; + private KafkaConsumer kConsumer; + private long fCreateTimeMs; + private long fLastTouch; + private long offset; + private Kafka011Consumer.State state; + private KafkaLiveLockAvoider2 fKafkaLiveLockAvoider; + private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011Consumer.class); + private final LinkedBlockingQueue> fPendingMsgs; + //private ArrayList fconsumerList; + @Override + public void commitOffsets() { + if (getState() == Kafka011Consumer.State.CLOSED) { + log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); + return; + } + kConsumer.commitSync(); + // fConsumer.close(); + + } + + @Override + public void setOffset(long offsetval) { + offset = offsetval; + } + + + public void setConsumerCache(KafkaConsumerCache cache) { + } + + //@Override + //public Message nextMessage(ArrayList l) { + // TODO Auto-generated method stub + //return null; + //} +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java new file mode 100644 index 0000000..ea9407b --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java @@ -0,0 +1,126 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + +import java.util.ArrayList; + +import org.apache.kafka.clients.consumer.ConsumerRecords; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * A consumer Util class for force polling when a rebalance issue is anticipated + * + * @author Ram + * + */ +public class Kafka011ConsumerUtil { + private static final EELFLogger log = EELFManager.getInstance().getLogger(Kafka011ConsumerUtil.class); + + /** + * @param fconsumercache + * @param fTopic + * @param fGroup + * @param fId + * @return + */ + public static boolean forcePollOnConsumer(final String fTopic, final String fGroup, final String fId) { + + Thread forcepollThread = new Thread(new Runnable() { + public void run() { + try { + + ArrayList kcsList = null; + + kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(fTopic + "::" + fGroup + "::", fId); + if (null != kcsList) { + for (int counter = 0; counter < kcsList.size(); counter++) { + + Kafka011Consumer kc1 = kcsList.get(counter); + + try { + ConsumerRecords recs = kc1.getConsumer().poll(0); + log.info("soft poll on " + kc1); + } catch (java.util.ConcurrentModificationException e) { + log.error("Error occurs for " + e); + } + + } + + } + + } catch (Exception e) { + log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage()); + e.printStackTrace(); + } + } + }); + + forcepollThread.start(); + + return false; + + } + + /** + * @param fconsumercache + * @param group + * @return + */ + public static boolean forcePollOnConsumer(final String group) { + + Thread forcepollThread = new Thread(new Runnable() { + public void run() { + try { + ArrayList kcsList = new ArrayList(); + kcsList = KafkaConsumerCache.getInstance().getConsumerListForCG(group); + + if (null != kcsList) { + + for (int counter = 0; counter < kcsList.size(); counter++) { + + Kafka011Consumer kc1 = kcsList.get(counter); + log.info("soft poll on remote nodes " + kc1); + ConsumerRecords recs = kc1.getConsumer().poll(0); + } + + } + + } catch (java.util.ConcurrentModificationException e) { + log.error("Error occurs for " + e); + } catch (Exception e) { + log.error("Failed and go to Exception block for " + group + " " + e.getMessage()); + e.printStackTrace(); + } + } + }); + + forcepollThread.start(); + return false; + + } + +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt new file mode 100644 index 0000000..dd6259f --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumer.txt @@ -0,0 +1,386 @@ +package com.att.dmf.mr.backends.kafka; + +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.KafkaException; + +import com.att.dmf.mr.backends.Consumer; + +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * A consumer instance that's created per-request. These are stateless so that + * clients can connect to this service as a proxy. + * + * @author peter + * + */ +public class KafkaConsumer implements Consumer { + private enum State { + OPENED, CLOSED + } + + /** + * KafkaConsumer() is constructor. It has following 4 parameters:- + * + * @param topic + * @param group + * @param id + * @param cc + * + */ + + public KafkaConsumer(String topic, String group, String id, Properties prop) throws Exception { + fTopic = topic; + fGroup = group; + fId = id; + // fConnector = cc; + + fCreateTimeMs = System.currentTimeMillis(); + fLastTouch = fCreateTimeMs; + fPendingMsgs = new LinkedBlockingQueue> (); + fLogTag = fGroup + "(" + fId + ")/" + fTopic; + offset = 0; + + state = KafkaConsumer.State.OPENED; + + // final Map topicCountMap = new HashMap(); + // topicCountMap.put(fTopic, 1); + // log.info(fLogTag +" kafka Consumer started at " + // +System.currentTimeMillis()); + // final Map>> consumerMap = + // fConnector.createMessageStreams(topicCountMap); + // final List> streams = + // consumerMap.get(fTopic); + + kConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(prop); + // System.out.println("I am in Consumer APP " + topic + "-- " + + // fConsumer); + kConsumer.subscribe(Arrays.asList(topic)); + log.info(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + System.out.println("-----id " +id); + + + try { ConsumerRecords records = + kConsumer.poll(500); System.out.println("---" + + records.count()); + + for (ConsumerRecord record : records) { + System.out.printf("offset = %d, key = %s, value = %s", + record.offset(), record.key(), record.value()); String t = + record.value(); + + } + }catch(Exception e){ + System.out.println( e); + } + System.out.println(fLogTag + " kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); + kConsumer.commitSync(); + // fConsumer.close(); + + + /* + * ConsumerRecords records = fConsumer.poll(500); + * System.out.println("---" + records.count()); + * + * for (ConsumerRecord record : records) { + * System.out.printf("offset = %d, key = %s, value = %s", + * record.offset(), record.key(), record.value()); String t = + * record.value(); + * + * } + * + * + * fConsumer.commitSync(); fConsumer.close(); + */ + + // fStream = streams.iterator().next(); + } + + + + private Consumer.Message makeMessage ( final ConsumerRecord msg ) + { + return new Consumer.Message() + { + @Override + public long getOffset () + { + return msg.offset (); + } + + @Override + public String getMessage () + { + return new String ( msg.value () ); + } + }; + } + + @Override + public synchronized Consumer.Message nextMessage () + { + + try + { + if ( fPendingMsgs.size () > 0 ) + { + return makeMessage ( fPendingMsgs.take () ); + } + } + catch ( InterruptedException x ) + { + log.warn ( "After size>0, pending msg take() threw InterruptedException. Ignoring. (" + x.getMessage () + ")", x ); + } + + + try + { + boolean foundMsgs = false; + System.out.println("entering into pollingWWWWWWWWWWWWWWWWW"); + final ConsumerRecords records = kConsumer.poll ( 100 ); + System.out.println("polling doneXXXXXXXXXXXXXXXXXXXXXXXXXXX...."); + for ( ConsumerRecord record : records ) + { + foundMsgs = true; + fPendingMsgs.offer ( record ); + } + + } + catch ( KafkaException x ) + { + log.debug ( fLogTag + ": KafkaException " + x.getMessage () ); + + } + catch ( java.lang.IllegalStateException | java.lang.IllegalArgumentException x ) + { + log.error ( fLogTag + ": Illegal state/arg exception in Kafka consumer; dropping stream. " + x.getMessage () ); + + } + + return null; + } + + + + /** + * getName() method returns string type value. returns 3 parameters in + * string:- fTopic,fGroup,fId + * + * @Override + */ + public String getName() { + return fTopic + " : " + fGroup + " : " + fId; + } + + /** + * getCreateTimeMs() method returns long type value. returns fCreateTimeMs + * variable value + * + * @Override + * + */ + public long getCreateTimeMs() { + return fCreateTimeMs; + } + + public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { + return kConsumer; + } + + /** + * getLastAccessMs() method returns long type value. returns fLastTouch + * variable value + * + * @Override + * + */ + public long getLastAccessMs() { + return fLastTouch; + } + + + + /** + * getOffset() method returns long type value. returns offset variable value + * + * @Override + * + */ + public long getOffset() { + return offset; + } + + /** + * commit offsets commitOffsets() method will be called on closed of + * KafkaConsumer. + * + * @Override + * + * + * public void commitOffsets() { if (getState() == + * KafkaConsumer.State.CLOSED) { log.warn("commitOffsets() called + * on closed KafkaConsumer " + getName()); return; } + * fConnector.commitOffsets(); } + */ + + /** + * updating fLastTouch with current time in ms + */ + public void touch() { + fLastTouch = System.currentTimeMillis(); + } + + /** + * getLastTouch() method returns long type value. returns fLastTouch + * variable value + * + */ + public long getLastTouch() { + return fLastTouch; + } + + /** + * setting the kafkaConsumer state to closed + */ + public synchronized boolean close() { + + if (getState() == KafkaConsumer.State.CLOSED) { + + log.warn("close() called on closed KafkaConsumer " + getName()); + return true; + } + + setState(KafkaConsumer.State.CLOSED); + // fConnector.shutdown(); + boolean retVal = kafkaConnectorshuttask(); + return retVal; + + } + + /* time out if the kafka shutdown fails for some reason */ + + private boolean kafkaConnectorshuttask() { + Callable run = new Callable() { + @Override + public Boolean call() throws Exception { + // your code to be timed + try { + System.out.println("consumer closing....." + kConsumer); + kConsumer.close(); + } catch (Exception e) { + log.info("@@@@@@Kafka Stream shutdown erorr occurred " + getName() + " " + e); + } + log.info("Kafka connection closure with in 15 seconds by a Executors task"); + return true; + } + }; + + RunnableFuture future = new FutureTask(run); + ExecutorService service = Executors.newSingleThreadExecutor(); + service.execute(future); + Boolean result = null; + try { + result = (Boolean) future.get(15, TimeUnit.SECONDS); // wait 1 + // second + } catch (TimeoutException ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task"); + future.cancel(true); + } catch (Exception ex) { + // timed out. Try to stop the code if possible. + log.info("Timeout Occured - Kafka connection closure with in 15 seconds by a Executors task" + ex); + future.cancel(true); + return false; + } + service.shutdown(); + return true; + } + + /** + * getConsumerGroup() returns Consumer group + * + * @return + */ + public String getConsumerGroup() { + return fGroup; + } + + /** + * getConsumerId returns Consumer Id + * + * @return + */ + public String getConsumerId() { + return fId; + } + + /** + * getState returns kafkaconsumer state + * + * @return + */ + private KafkaConsumer.State getState() { + return this.state; + } + + /** + * setState() sets the kafkaConsumer state + * + * @param state + */ + private void setState(KafkaConsumer.State state) { + this.state = state; + } + + // private ConsumerConnector fConnector; + private final String fTopic; + private final String fGroup; + private final String fId; + private final String fLogTag; + // private final KafkaStream fStream; + private final org.apache.kafka.clients.consumer.KafkaConsumer kConsumer; + private long fCreateTimeMs; + private long fLastTouch; + private long offset; + private KafkaConsumer.State state; + private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumer.class); + private final LinkedBlockingQueue> fPendingMsgs; + // private static final Logger log = + // LoggerFactory.getLogger(KafkaConsumer.class); + + @Override + public void commitOffsets() { + if (getState() == KafkaConsumer.State.CLOSED) { + log.warn("commitOffsets() called on closed KafkaConsumer " + getName()); + return; + } + kConsumer.commitSync(); + // fConsumer.close(); + + } + + + + @Override + public void setOffset(long offsetval) { + // TODO Auto-generated method stub + offset = offsetval; + } +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java new file mode 100644 index 0000000..4340cae --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -0,0 +1,749 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Resource; + +import org.I0Itec.zkclient.exception.ZkException; +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.curator.utils.EnsurePath; +import org.apache.curator.utils.ZKPaths; +import org.apache.http.annotation.NotThreadSafe; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.ComponentScan; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.backends.MetricsSet; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.exception.DMaaPErrorMessages; +import com.att.dmf.mr.utils.ConfigurationReader; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.metrics.CdmTimer; + +/** + * @NotThreadSafe but expected to be used within KafkaConsumerFactory, which + * must be + * @author peter + * + */ +@NotThreadSafe +public class KafkaConsumerCache { + + private static KafkaConsumerCache kafkaconscache = null; + + public static KafkaConsumerCache getInstance() { + if (kafkaconscache == null) + kafkaconscache = new KafkaConsumerCache(); + + return kafkaconscache; + } + + private static final String kSetting_ConsumerHandoverWaitMs = "cambria.consumer.cache.handoverWaitMs"; + private static final int kDefault_ConsumerHandoverWaitMs = 500; + + private static final String kSetting_SweepEverySeconds = "cambria.consumer.cache.sweepFreqSeconds"; + private static final String kSetting_TouchEveryMs = "cambria.consumer.cache.touchFreqMs"; + + private static final String kSetting_ZkBasePath = "cambria.consumer.cache.zkBasePath"; + private static final String kDefault_ZkBasePath = CambriaConstants.kDefault_ZkRoot + "/consumerCache"; + + // kafka defaults to timing out a client after 6 seconds of inactivity, but + // it heartbeats even when the client isn't fetching. Here, we don't + // want to prematurely rebalance the consumer group. Assuming clients are + // hitting + // the server at least every 30 seconds, timing out after 2 minutes should + // be okay. + // FIXME: consider allowing the client to specify its expected call rate? + private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + + // check for expirations pretty regularly + private static final long kDefault_SweepEverySeconds = 15; + + private enum Status { + NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED + } + + // @Qualifier("kafkalockavoid") + + // @Resource + // @Qualifier("kafkalockavoid") + // KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + + @Autowired + private DMaaPErrorMessages errorMessages; + + // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider(); + /** + * User defined exception class for kafka consumer cache + * + * @author nilanjana.maity + * + */ + public class KafkaConsumerCacheException extends Exception { + /** + * To throw the exception + * + * @param t + */ + KafkaConsumerCacheException(Throwable t) { + super(t); + } + + /** + * + * @param s + */ + public KafkaConsumerCacheException(String s) { + super(s); + } + + private static final long serialVersionUID = 1L; + } + + /** + * Creates a KafkaConsumerCache object. Before it is used, you must call + * startCache() + * + * @param apiId + * @param s + * @param metrics + */ + public KafkaConsumerCache() { + + String strkSetting_ZkBasePath = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + kSetting_ZkBasePath); + if (null == strkSetting_ZkBasePath) + strkSetting_ZkBasePath = kDefault_ZkBasePath; + fBaseZkPath = strkSetting_ZkBasePath; + + fConsumers = new ConcurrentHashMap(); + fSweepScheduler = Executors.newScheduledThreadPool(1); + + curatorConsumerCache = null; + + status = Status.NOT_STARTED; + // Watcher for consumer rebalancing across nodes. Kafka011 rebalancing + // work around + + listener = new ConnectionStateListener() { + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + + log.info("ZooKeeper connection expired"); + handleConnectionLoss(); + } else if (newState == ConnectionState.READ_ONLY) { + log.warn("ZooKeeper connection set to read only mode."); + } else if (newState == ConnectionState.RECONNECTED) { + log.info("ZooKeeper connection re-established"); + handleReconnection(); + } else if (newState == ConnectionState.SUSPENDED) { + log.warn("ZooKeeper connection has been suspended."); + handleConnectionSuspended(); + } + } + }; + } + + /** + * Start the cache service. This must be called before any get/put + * operations. + * + * @param mode + * DMAAP or cambria + * @param curator + * @throws IOException + * @throws KafkaConsumerCacheException + */ + public void startCache(String mode, CuratorFramework curator) throws KafkaConsumerCacheException { + + if (fApiId == null) { + throw new IllegalArgumentException("API Node ID must be specified."); + } + + try { + + if (mode != null && mode.equals(CambriaConstants.DMAAP)) { + curator = getCuratorFramework(curator); + } + curator.getConnectionStateListenable().addListener(listener); + setStatus(Status.CONNECTED); + curatorConsumerCache = new PathChildrenCache(curator, fBaseZkPath, true); + curatorConsumerCache.start(); + curatorConsumerCache.getListenable().addListener(new PathChildrenCacheListener() { + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: { + try { + final String apiId = new String(event.getData().getData()); + final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); + + log.info(apiId + " started consumer " + consumer); + } catch (Exception ex) { + log.info("#Error Occured during Adding child" + ex); + } + break; + } + case CHILD_UPDATED: { + final String apiId = new String(event.getData().getData()); + final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); + + if (fConsumers.containsKey(consumer)) { + log.info(apiId + " claimed consumer " + consumer + " from " + fApiId + + " but wont hand over"); + // Commented so that it dont give the connection + // until the active node is running for this client + // id. + dropClaimedConsumer(consumer); + } + + break; + } + case CHILD_REMOVED: { + final String consumer = ZKPaths.getNodeFromPath(event.getData().getPath()); + + if (fConsumers.containsKey(consumer)) { + log.info("Someone wanted consumer " + consumer + + " gone; but not removing it from the cache"); + dropConsumer(consumer, false); + } + + break; + } + + default: + break; + } + } + }); + + // initialize the ZK path + EnsurePath ensurePath = new EnsurePath(fBaseZkPath); + ensurePath.ensure(curator.getZookeeperClient()); + + // final long freq = fSettings.getLong(kSetting_SweepEverySeconds, + // kDefault_SweepEverySeconds); + long freq = kDefault_SweepEverySeconds; + String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + kSetting_SweepEverySeconds); + if (null != strkSetting_SweepEverySeconds) { + freq = Long.parseLong(strkSetting_SweepEverySeconds); + } + + fSweepScheduler.scheduleAtFixedRate(new sweeper(), freq, freq, TimeUnit.SECONDS); + log.info("KafkaConsumerCache started"); + log.info("sweeping cached clients every " + freq + " seconds"); + } catch (ZkException e) { + log.error("@@@@@@ ZK Exception occured for " + e); + throw new KafkaConsumerCacheException(e); + } catch (Exception e) { + log.error("@@@@@@ Exception occured for " + e); + throw new KafkaConsumerCacheException(e); + } + } + + /** + * Getting the curator oject to start the zookeeper connection estabished + * + * @param curator + * @return curator object + */ + public static CuratorFramework getCuratorFramework(CuratorFramework curator) { + if (curator.getState() == CuratorFrameworkState.LATENT) { + curator.start(); + + try { + curator.blockUntilConnected(); + } catch (InterruptedException e) { + // Ignore + log.error("error while setting curator framework :" + e.getMessage()); + } + } + + return curator; + } + + /** + * Stop the cache service. + */ + public void stopCache() { + setStatus(Status.DISCONNECTED); + + final CuratorFramework curator = ConfigurationReader.getCurator(); + + if (curator != null) { + try { + curator.getConnectionStateListenable().removeListener(listener); + curatorConsumerCache.close(); + log.info("Curator client closed"); + } catch (ZkInterruptedException e) { + log.warn("Curator client close interrupted: " + e.getMessage()); + } catch (IOException e) { + log.warn("Error while closing curator PathChildrenCache for KafkaConsumerCache" + e.getMessage()); + } + + curatorConsumerCache = null; + } + + if (fSweepScheduler != null) { + fSweepScheduler.shutdownNow(); + log.info("cache sweeper stopped"); + } + + if (fConsumers != null) { + fConsumers.clear(); + fConsumers = null; + } + + setStatus(Status.NOT_STARTED); + + log.info("Consumer cache service stopped"); + } + + /** + * Get a cached consumer by topic, group, and id, if it exists (and remains + * valid) In addition, this method waits for all other consumer caches in + * the cluster to release their ownership and delete their version of this + * consumer. + * + * @param topic + * @param consumerGroupId + * @param clientId + * @return a consumer, or null + */ + public Kafka011Consumer getConsumerFor(String topic, String consumerGroupId, String clientId) + throws KafkaConsumerCacheException { + if (getStatus() != KafkaConsumerCache.Status.CONNECTED) + throw new KafkaConsumerCacheException("The cache service is unavailable."); + + final String consumerKey = makeConsumerKey(topic, consumerGroupId, clientId); + final Kafka011Consumer kc = fConsumers.get(consumerKey); + + if (kc != null) { + log.debug("Consumer cache hit for [" + consumerKey + "], last was at " + kc.getLastTouch()); + kc.touch(); + fMetrics.onKafkaConsumerCacheHit(); + } else { + log.debug("Consumer cache miss for [" + consumerKey + "]"); + fMetrics.onKafkaConsumerCacheMiss(); + } + + return kc; + } + + /** + * Get a cached consumer by topic, group, and id, if it exists (and remains + * valid) In addition, this method waits for all other consumer caches in + * the cluster to release their ownership and delete their version of this + * consumer. + * + * @param topic + * @param consumerGroupId + * @param clientId + * @return a consumer, or null + */ + public ArrayList getConsumerListForCG(String topicgroup, String clientId) + throws KafkaConsumerCacheException { + if (getStatus() != KafkaConsumerCache.Status.CONNECTED) + throw new KafkaConsumerCacheException("The cache service is unavailable."); + ArrayList kcl = new ArrayList(); + // final String consumerKey = makeConsumerKey(topic, consumerGroupId, + // clientId); + Enumeration strEnum = fConsumers.keys(); + String consumerLocalKey = null; + while (strEnum.hasMoreElements()) { + consumerLocalKey = strEnum.nextElement(); + + if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) { + + // System.out.println("consumer key returning from + // getConsumerListforCG +++++++++ " + consumerLocalKey + // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); + + } + } + + return kcl; + } + + public ArrayList getConsumerListForCG(String group) throws KafkaConsumerCacheException { + if (getStatus() != KafkaConsumerCache.Status.CONNECTED) + throw new KafkaConsumerCacheException("The cache service is unavailable."); + ArrayList kcl = new ArrayList(); + // final String consumerKey = makeConsumerKey(topic, consumerGroupId, + // clientId); + Enumeration strEnum = fConsumers.keys(); + String consumerLocalKey = null; + while (strEnum.hasMoreElements()) { + consumerLocalKey = strEnum.nextElement(); + + if (consumerLocalKey.startsWith(group)) { + + // System.out.println("consumer key returning from + // getConsumerListforCG +++++++++ " + consumerLocalKey + // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); + + } + } + + return kcl; + } + + /** + * Put a consumer into the cache by topic, group and ID + * + * @param topic + * @param consumerGroupId + * @param consumerId + * @param consumer + * @throws KafkaConsumerCacheException + */ + public void putConsumerFor(String topic, String consumerGroupId, String consumerId, Kafka011Consumer consumer) + throws KafkaConsumerCacheException { + if (getStatus() != KafkaConsumerCache.Status.CONNECTED) + throw new KafkaConsumerCacheException("The cache service is unavailable."); + + final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); + fConsumers.put(consumerKey, consumer); + + // String appId = "node-instance-"+i; + + log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId); + } + + public Collection getConsumers() { + return new LinkedList(fConsumers.values()); + } + + /** + * This method is to drop all the consumer + */ + public void dropAllConsumers() { + for (Entry entry : fConsumers.entrySet()) { + dropConsumer(entry.getKey(), true); + } + + // consumers should be empty here + if (fConsumers.size() > 0) { + log.warn("During dropAllConsumers, the consumer map is not empty."); + fConsumers.clear(); + } + } + + /** + * Drop a consumer from our cache due to a timeout + * + * @param key + */ + private void dropTimedOutConsumer(String key) { + fMetrics.onKafkaConsumerTimeout(); + + if (!fConsumers.containsKey(key)) { + log.warn("Attempted to drop a timed out consumer which was not in our cache: " + key); + return; + } + + // First, drop this consumer from our cache + boolean isdrop = dropConsumer(key, true); + if (!isdrop) { + return; + } + final CuratorFramework curator = ConfigurationReader.getCurator(); + + try { + curator.delete().guaranteed().forPath(fBaseZkPath + "/" + key); + log.info(" ^ deleted " + fBaseZkPath + "/" + key); + } catch (NoNodeException e) { + log.warn("A consumer was deleted from " + fApiId + + "'s cache, but no Cambria API node had ownership of it in ZooKeeper"); + } catch (Exception e) { + log.debug("Unexpected exception while deleting consumer: " + e.getMessage()); + log.info(" %%%%%%@# Unexpected exception while deleting consumer: " + e.getMessage()); + } + + try { + int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs; + String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + kSetting_ConsumerHandoverWaitMs); + if (strkSetting_ConsumerHandoverWaitMs != null) + consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); + Thread.sleep(consumerHandoverWaitMs); + } catch (InterruptedException e) { + // Ignore + } + log.info("Dropped " + key + " consumer due to timeout"); + } + + /** + * Drop a consumer from our cache due to another API node claiming it as + * their own. + * + * @param key + */ + private void dropClaimedConsumer(String key) { + // if the consumer is still in our cache, it implies a claim. + if (fConsumers.containsKey(key)) { + fMetrics.onKafkaConsumerClaimed(); + log.info("Consumer [" + key + "] claimed by another node."); + } + log.info("^dropping claimed Kafka consumer " + key); + dropConsumer(key, false); + } + + /** + * Removes the consumer from the cache and closes its connection to the + * kafka broker(s). + * + * @param key + * @param dueToTimeout + */ + private boolean dropConsumer(String key, boolean dueToTimeout) { + final Kafka011Consumer kc = fConsumers.get(key); + log.info("closing Kafka consumer " + key + " object " + kc); + if (kc != null) { + // log.info("closing Kafka consumer " + key); + if (kc.close()) { + fConsumers.remove(key); + + } else { + return false; + } + } + return true; + } + + // private final rrNvReadable fSettings; + private MetricsSet fMetrics; + private final String fBaseZkPath; + private final ScheduledExecutorService fSweepScheduler; + private String fApiId; + + public void setfMetrics(final MetricsSet metrics) { + this.fMetrics = metrics; + } + + public void setfApiId(final String id) { + this.fApiId = id; + } + + private final ConnectionStateListener listener; + + private ConcurrentHashMap fConsumers; + private PathChildrenCache curatorConsumerCache; + + private volatile Status status; + + private void handleReconnection() { + + log.info("Reading current cache data from ZK and synchronizing local cache"); + final List cacheData = curatorConsumerCache.getCurrentData(); + // Remove all the consumers in this API nodes cache that now belong to + // other API nodes. + for (ChildData cachedConsumer : cacheData) { + final String consumerId = ZKPaths.getNodeFromPath(cachedConsumer.getPath()); + final String owningApiId = (cachedConsumer.getData() != null) ? new String(cachedConsumer.getData()) + : "undefined"; + if (!fApiId.equals(owningApiId)) { + fConsumers.remove(consumerId); // Commented to avoid removing + // the value cache hashmap but the lock still exists. + // This is not considered in kafka consumer Factory + log.info("@@@ Validating current cache data from ZK and synchronizing local cache" + owningApiId + + " removing " + consumerId); + } + } + + setStatus(Status.CONNECTED); + } + + private void handleConnectionSuspended() { + log.info("Suspending cache until ZK connection is re-established"); + + setStatus(Status.SUSPENDED); + } + + private void handleConnectionLoss() { + log.info("Clearing consumer cache (shutting down all Kafka consumers on this node)"); + + setStatus(Status.DISCONNECTED); + + closeAllCachedConsumers(); + fConsumers.clear(); + } + + private void closeAllCachedConsumers() { + for (Entry entry : fConsumers.entrySet()) { + try { + entry.getValue().close(); + } catch (Exception e) { + log.info("@@@@@@ Error occurd while closing Clearing All cache " + e); + } + } + } + + private static String makeConsumerKey(String topic, String consumerGroupId, String clientId) { + return topic + "::" + consumerGroupId + "::" + clientId; + } + + /** + * This method is to get a lock + * + * @param topic + * @param consumerGroupId + * @param consumerId + * @throws KafkaConsumerCacheException + */ + public void signalOwnership(final String topic, final String consumerGroupId, final String consumerId) + throws KafkaConsumerCacheException { + // get a lock at /:::: + final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); + final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); + + try { + final String consumerPath = fBaseZkPath + "/" + consumerKey; + log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); + final CuratorFramework curator = ConfigurationReader.getCurator(); + + try { + curator.setData().forPath(consumerPath, fApiId.getBytes()); + } catch (KeeperException.NoNodeException e) { + curator.create().creatingParentsIfNeeded().forPath(consumerPath, fApiId.getBytes()); + } + log.info(fApiId + " successfully claimed ownership of consumer " + consumerKey); + timer.end(); + } catch (Exception e) { + log.error(fApiId + " failed to claim ownership of consumer " + consumerKey); + throw new KafkaConsumerCacheException(e); + } + + log.info("Backing off to give the Kafka broker time to clean up the ZK data for this consumer"); + + try { + int consumerHandoverWaitMs = kDefault_ConsumerHandoverWaitMs; + String strkSetting_ConsumerHandoverWaitMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + kSetting_ConsumerHandoverWaitMs); + if (strkSetting_ConsumerHandoverWaitMs != null) + consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); + Thread.sleep(consumerHandoverWaitMs); + } catch (InterruptedException e) { + // Ignore + } + } + + public KafkaLiveLockAvoider2 getkafkaLiveLockAvoiderObj() { + return null; + } + + public void sweep() { + final LinkedList removals = new LinkedList(); + long mustTouchEveryMs = kDefault_MustTouchEveryMs; + String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + kSetting_TouchEveryMs); + if (null != strkSetting_TouchEveryMs) { + mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); + } + + // final long mustTouchEveryMs = + // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; + + for (Entry e : fConsumers.entrySet()) { + final long lastTouchMs = e.getValue().getLastTouch(); + log.debug("consumer #####1" + e.getKey() + " " + lastTouchMs + " < " + oldestAllowedTouchMs); + + if (lastTouchMs < oldestAllowedTouchMs) { + log.info("consumer " + e.getKey() + " has expired"); + removals.add(e.getKey()); + } + } + + for (String key : removals) { + dropTimedOutConsumer(key); + } + } + + /** + * Creating a thread to run the sweep method + * + * @author nilanjana.maity + * + */ + private class sweeper implements Runnable { + /** + * run method + */ + public void run() { + sweep(); + } + } + + /** + * This method is to drop consumer + * + * @param topic + * @param consumerGroup + * @param clientId + */ + public void dropConsumer(String topic, String consumerGroup, String clientId) { + dropConsumer(makeConsumerKey(topic, consumerGroup, clientId), false); + } + + private Status getStatus() { + return this.status; + } + + private void setStatus(Status status) { + this.status = status; + } + + private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); + // private static final Logger log = + // LoggerFactory.getLogger(KafkaConsumerCache.class); +} \ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java new file mode 100644 index 0000000..805701a --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java @@ -0,0 +1,161 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; + +//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka") +@Component +public class KafkaLiveLockAvoider2 { + + public static final String ZNODE_ROOT = "/live-lock-avoid"; + public static final String ZNODE_LOCKS = "/locks"; + public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks"; + + private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS; + private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS; + private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaLiveLockAvoider2.class.getName()); + + @Autowired + @Qualifier("curator") + private CuratorFramework curatorFramework; + + @PostConstruct + public void init() { + System.out.println("Welcome......................................................................................"); + try { + if (curatorFramework.checkExists().forPath(locksPath) == null) { + curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath); + } + if (curatorFramework.checkExists().forPath(tasksPath) == null) { + curatorFramework.create().creatingParentsIfNeeded().forPath(tasksPath); + } + + } catch (Exception e) { + //e.printStackTrace(); + log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e); + + } + + + } + public void unlockConsumerGroup(String appId, String groupName) throws Exception { + + log.info("Signalling unlock to all conumsers of in group [{}] now, " , groupName); + + String fullLockPath = String.format("%s/%s", locksPath, groupName ); + String fullTasksPath = null; + + try { + + //Use the Curator recipe for a Mutex lock, only one process can be broadcasting unlock instructions for a group + InterProcessMutex lock = new InterProcessMutex(curatorFramework, fullLockPath); + if ( lock.acquire(100L, TimeUnit.MILLISECONDS) ) + { + try + { + List taskNodes = curatorFramework.getChildren().forPath(tasksPath); + for (String taskNodeName : taskNodes) { + if(!taskNodeName.equals(appId)) { + + fullTasksPath = String.format("%s/%s/%s", tasksPath, taskNodeName, groupName); + log.info("Writing groupName {} to path {}",groupName, fullTasksPath); + + + if(curatorFramework.checkExists().forPath(fullTasksPath) != null) { + curatorFramework.delete().forPath(fullTasksPath); + } + curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(fullTasksPath); + } + } + + + } + finally + { + //Curator lock recipe requires a acquire() to be followed by a release() + lock.release(); + } + }else { + log.info("Could not obtain the avoider lock, another process has the avoider lock? {}", !lock.isAcquiredInThisProcess() ); + } + + + } catch (Exception e) { + log.error("Error setting up either lock ZNode {} or task ZNode {}",fullLockPath, fullTasksPath,e); + throw e; + } + + + } + + /* + * Shoud be called once per MR server instance. + * + */ + public void startNewWatcherForServer(String appId, LiveLockAvoidance avoidanceCallback) { + LockInstructionWatcher instructionWatcher = new LockInstructionWatcher(curatorFramework,avoidanceCallback,this); + assignNewProcessNode(appId, instructionWatcher); + + } + + + protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) { + + String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId; + //Watcher processNodeWatcher = createWatcher(); + + try { + + if(curatorFramework.checkExists().forPath(taskHolderZnodePath) != null) { + curatorFramework.delete().deletingChildrenIfNeeded().forPath(taskHolderZnodePath); + + } + curatorFramework.create().forPath(taskHolderZnodePath); + //setup the watcher + curatorFramework.getChildren().usingWatcher(processNodeWatcher).inBackground().forPath(taskHolderZnodePath); + log.info("Done creating task holder and watcher for APP name: {}",appId); + + } catch (Exception e) { + log.error("Could not add new processing node for name {}", appId, e); + } + + } + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java new file mode 100644 index 0000000..30209f0 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -0,0 +1,235 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.json.JSONException; +import org.springframework.beans.factory.annotation.Qualifier; + +import com.att.dmf.mr.backends.Publisher; +import com.att.dmf.mr.constants.CambriaConstants; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.drumlin.till.nv.rrNvReadable; + +//import kafka.FailedToSendMessageException; +//import kafka.javaapi.producer.Producer; +//import kafka.producer.KeyedMessage; +//import kafka.producer.ProducerConfig; +//import kafka.producer.KeyedMessage; + +/** + * Sends raw JSON objects into Kafka. + * + * Could improve space: BSON rather than JSON? + * + * @author peter + * + */ + +public class KafkaPublisher implements Publisher { + /** + * constructor initializing + * + * @param settings + * @throws rrNvReadable.missingReqdSetting + */ + public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting { + //fSettings = settings; + + final Properties props = new Properties(); + /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092"); + transferSetting(fSettings, props, "request.required.acks", "1"); + transferSetting(fSettings, props, "message.send.max.retries", "5"); + transferSetting(fSettings, props, "retry.backoff.ms", "150"); */ + String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); + if(null==kafkaConnUrl){ + + kafkaConnUrl="localhost:9092"; + } + //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf"; + // props.put("bootstrap.servers", bootSever); + //System.setProperty("java.security.auth.login.config",jaaspath); + + /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); + transferSetting( props, "sasl.mechanism", "PLAIN");*/ + transferSetting( props, "bootstrap.servers",kafkaConnUrl); + //transferSetting( props, "metadata.broker.list", kafkaConnUrl); + transferSetting( props, "request.required.acks", "1"); + transferSetting( props, "message.send.max.retries", "5"); + transferSetting(props, "retry.backoff.ms", "150"); + + //props.put("serializer.class", "kafka.serializer.StringEncoder"); + + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + //fConfig = new ProducerConfig(props); + //fProducer = new Producer(fConfig); + fProducer = new KafkaProducer<>(props); + } + + /** + * Send a message with a given topic and key. + * + * @param msg + * @throws FailedToSendMessageException + * @throws JSONException + */ + @Override + public void sendMessage(String topic, message msg) throws IOException{ + final List msgs = new LinkedList(); + msgs.add(msg); + sendMessages(topic, msgs); + } + + /** + * method publishing batch messages + * This method is commented from 0.8 to 0.11 upgrade + * @param topic + * @param kms + * throws IOException + * + public void sendBatchMessage(String topic, ArrayList> kms) throws IOException { + try { + fProducer.send(kms); + + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } + + } */ + + + /* + * Kafka 11.0 Interface + * @see com.att.nsa.cambria.backends.Publisher#sendBatchMessageNew(java.lang.String, java.util.ArrayList) + */ + public void sendBatchMessageNew(String topic, ArrayList > kms) throws IOException { + try { + for (ProducerRecord km : kms) { + fProducer.send(km); + } + + } catch (Exception excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new IOException(excp.getMessage(), excp); + } + + } + + /** + * Send a set of messages. Each must have a "key" string value. + * + * @param topic + * @param msg + * @throws FailedToSendMessageException + * @throws JSONException + * + @Override + public void sendMessages(String topic, List msgs) + throws IOException, FailedToSendMessageException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + + final List> kms = new ArrayList>(msgs.size()); + for (message o : msgs) { + final KeyedMessage data = new KeyedMessage(topic, o.getKey(), o.toString()); + kms.add(data); + } + try { + fProducer.send(kms); + + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } + } */ + @Override + public void sendMessagesNew(String topic, List msgs) + throws IOException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); +try{ + final List> kms = new ArrayList>(msgs.size()); + for (message o : msgs) { + + final ProducerRecord data = new ProducerRecord(topic, o.getKey(), o.toString()); + //kms.add(data); + + try { + + fProducer.send(data); + + } catch (Exception excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new Exception(excp.getMessage(), excp); + } + } + + }catch(Exception e){} +} + //private final rrNvReadable fSettings; + + //private ProducerConfig fConfig; + private Producer fProducer; + + /** + * It sets the key value pair + * @param topic + * @param msg + * @param key + * @param defVal + */ + private void transferSetting(Properties props, String key, String defVal) { + String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); + if (null==kafka_prop) kafka_prop=defVal; + //props.put(key, settings.getString("kafka." + key, defVal)); + props.put(key, kafka_prop); + } + + //private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class); + + private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaPublisher.class); + + @Override + public void sendMessages(String topic, List msgs) throws IOException { + // TODO Auto-generated method stub + + } + + //@Override + //public void sendBatchMessage(String topic, ArrayList> kms) throws IOException { + // TODO Auto-generated method stub + + //} +} \ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java new file mode 100644 index 0000000..a13ecea --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/LiveLockAvoidance.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + + + +/** + * Live Lock Avoidance interface. To be implemented by the main message router client + * + */ +public interface LiveLockAvoidance { + + /** + * Gets the unique id + * @return the unique id for the Message Router server instance + */ + String getAppId(); + + + /** + * Main callback to inform the local MR server instance that all consumers in a group need to soft poll + * @param groupName name of the Kafka consumer group needed a soft poll + */ + void handleRebalanceUnlock( String groupName); + +} diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java new file mode 100644 index 0000000..5d3bc47 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/kafka/LockInstructionWatcher.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.kafka; + +import java.util.List; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +/** + * + * LockInstructionWatcher + * A package-private class used internally by the KafkaLiveLockAvoider. + * + * This class implements the zookeeper Watcher callback and listens for changes on child nodes changing. + * Each child node is actually a Kafka group name that needs to be soft polled. Deletion of the child nodes + * after soft poll unlocking is finished. + * + * + */ +public class LockInstructionWatcher implements Watcher { + + private CuratorFramework curatorFramework; + private LiveLockAvoidance avoidanceCallback; + private KafkaLiveLockAvoider2 avoider; + + private static final EELFLogger log = EELFManager.getInstance().getLogger(LockInstructionWatcher.class.getName()); + + + public LockInstructionWatcher(CuratorFramework curatorFramework, LiveLockAvoidance avoidanceCallback, + KafkaLiveLockAvoider2 avoider) { + super(); + this.curatorFramework = curatorFramework; + this.avoidanceCallback = avoidanceCallback; + this.avoider = avoider; + } + + + @Override + public void process(WatchedEvent event) { + + switch (event.getType()) { + case NodeChildrenChanged: + + + try { + + log.info("node children changed at path: {}", event.getPath()); + //String grpName = new String(curatorFramework.getData().forPath(event.getPath())); + List children = curatorFramework.getChildren().forPath(event.getPath()); + + log.info("found children nodes prodcessing now"); + for (String child : children) { + String childPath = String.format("%s/%s", event.getPath(), child); + log.info("Processing child task at node {}",childPath); + avoidanceCallback.handleRebalanceUnlock( child); + log.info("Deleting child task at node {}",childPath); + curatorFramework.delete().forPath(childPath); + } + //reset the watch with the avoider + avoider.assignNewProcessNode(avoidanceCallback.getAppId(), this); + + + } catch (Exception e) { + log.error("Error manipulating ZNode data in watcher",e); + } + + break; + + default: + log.info("Listner fired on path: {}, with event: {}", event.getPath(), event.getType()); + break; + } + } + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java new file mode 100644 index 0000000..0c34bfd --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java @@ -0,0 +1,182 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.memory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; + +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.backends.ConsumerFactory; +/** + * + * @author anowarul.islam + * + */ +public class MemoryConsumerFactory implements ConsumerFactory +{ + /** + * + * Initializing constructor + * @param q + */ + public MemoryConsumerFactory ( MemoryQueue q ) + { + fQueue = q; + } + + /** + * + * @param topic + * @param consumerGroupId + * @param clientId + * @param timeoutMs + * @return Consumer + */ + @Override + public Consumer getConsumerFor ( String topic, String consumerGroupId, String clientId, int timeoutMs, String remotehost ) + { + return new MemoryConsumer ( topic, consumerGroupId ); + } + + private final MemoryQueue fQueue; + + /** + * + * Define nested inner class + * + */ + private class MemoryConsumer implements Consumer + { + /** + * + * Initializing MemoryConsumer constructor + * @param topic + * @param consumer + * + */ + public MemoryConsumer ( String topic, String consumer ) + { + fTopic = topic; + fConsumer = consumer; + fCreateMs = System.currentTimeMillis (); + fLastAccessMs = fCreateMs; + } + + @Override + /** + * + * return consumer details + */ + public Message nextMessage () + { + return fQueue.get ( fTopic, fConsumer ); + } + + private final String fTopic; + private final String fConsumer; + private final long fCreateMs; + private long fLastAccessMs; + + @Override + public boolean close() { + //Nothing to close/clean up. + return true; + } + /** + * + */ + public void commitOffsets() + { + // ignoring this aspect + } + /** + * get offset + */ + public long getOffset() + { + return 0; + } + + @Override + /** + * get consumer topic name + */ + public String getName () + { + return fTopic + "/" + fConsumer; + } + + @Override + public long getCreateTimeMs () + { + return fCreateMs; + } + + @Override + public long getLastAccessMs () + { + return fLastAccessMs; + } + + + + @Override + public void setOffset(long offset) { + // TODO Auto-generated method stub + + } + + + } + + @Override + public void destroyConsumer(String topic, String consumerGroupId, + String clientId) { + //No cache for memory consumers, so NOOP + } + + @Override + public void dropCache () + { + // nothing to do - there's no cache here + } + + @Override + /** + * @return ArrayList + */ + public Collection getConsumers () + { + return new ArrayList (); + } + + @Override + public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, + String remotehost) throws UnavailableException, CambriaApiException { + // TODO Auto-generated method stub + return null; + } + + +} diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java new file mode 100644 index 0000000..22f0588 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java @@ -0,0 +1,198 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.memory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import com.att.dmf.mr.metabroker.Broker; +import com.att.dmf.mr.metabroker.Topic; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaApiKey; + +/** + * + * @author anowarul.islam + * + */ +public class MemoryMetaBroker implements Broker { + /** + * + * @param mq + * @param configDb + * @param settings + */ + public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { + //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { + fQueue = mq; + fTopics = new HashMap(); + } + + @Override + public List getAllTopics() { + return new LinkedList(fTopics.values()); + } + + @Override + public Topic getTopic(String topic) { + return fTopics.get(topic); + } + + @Override + public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas, + boolean transactionEnabled) throws TopicExistsException { + if (getTopic(topic) != null) { + throw new TopicExistsException(topic); + } + fQueue.createTopic(topic); + fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled)); + return getTopic(topic); + } + + @Override + public void deleteTopic(String topic) { + fTopics.remove(topic); + fQueue.removeTopic(topic); + } + + private final MemoryQueue fQueue; + private final HashMap fTopics; + + private static class MemTopic implements Topic { + /** + * constructor initialization + * + * @param name + * @param desc + * @param owner + * @param transactionEnabled + */ + public MemTopic(String name, String desc, String owner, boolean transactionEnabled) { + fName = name; + fDesc = desc; + fOwner = owner; + ftransactionEnabled = transactionEnabled; + fReaders = null; + fWriters = null; + } + + @Override + public String getOwner() { + return fOwner; + } + + @Override + public NsaAcl getReaderAcl() { + return fReaders; + } + + @Override + public NsaAcl getWriterAcl() { + return fWriters; + } + + @Override + public void checkUserRead(NsaApiKey user) throws AccessDeniedException { + if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) { + throw new AccessDeniedException(user == null ? "" : user.getKey()); + } + } + + @Override + public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { + if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) { + throw new AccessDeniedException(user == null ? "" : user.getKey()); + } + } + + @Override + public String getName() { + return fName; + } + + @Override + public String getDescription() { + return fDesc; + } + + @Override + public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + if (fWriters == null) { + fWriters = new NsaAcl(); + } + fWriters.add(publisherId); + } + + @Override + public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + fWriters.remove(publisherId); + } + + @Override + public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + if (fReaders == null) { + fReaders = new NsaAcl(); + } + fReaders.add(consumerId); + } + + @Override + public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { + if (!fOwner.equals(asUser.getKey())) { + throw new AccessDeniedException("User does not own this topic " + fName); + } + fReaders.remove(consumerId); + } + + private final String fName; + private final String fDesc; + private final String fOwner; + private NsaAcl fReaders; + private NsaAcl fWriters; + private boolean ftransactionEnabled; + + @Override + public boolean isTransactionEnabled() { + return ftransactionEnabled; + } + + @Override + public Set getOwners() { + final TreeSet set = new TreeSet (); + set.add ( fOwner ); + return set; + } + } +} diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java new file mode 100644 index 0000000..0629972 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java @@ -0,0 +1,207 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.memory; + +import java.util.ArrayList; +import java.util.HashMap; + +import com.att.dmf.mr.backends.Consumer; +import com.att.dmf.mr.backends.Publisher.message; + +/** + * When broker type is memory, then this class is doing all the topic related + * operations + * + * @author anowarul.islam + * + */ +public class MemoryQueue { + // map from topic to list of msgs + private HashMap fQueue; + private HashMap> fOffsets; + + /** + * constructor storing hashMap objects in Queue and Offsets object + */ + public MemoryQueue() { + fQueue = new HashMap(); + fOffsets = new HashMap>(); + } + + /** + * method used to create topic + * + * @param topic + */ + public synchronized void createTopic(String topic) { + LogBuffer q = fQueue.get(topic); + if (q == null) { + q = new LogBuffer(1024 * 1024); + fQueue.put(topic, q); + } + } + + /** + * method used to remove topic + * + * @param topic + */ + public synchronized void removeTopic(String topic) { + LogBuffer q = fQueue.get(topic); + if (q != null) { + fQueue.remove(topic); + } + } + + /** + * method to write message on topic + * + * @param topic + * @param m + */ + public synchronized void put(String topic, message m) { + LogBuffer q = fQueue.get(topic); + if (q == null) { + createTopic(topic); + q = fQueue.get(topic); + } + q.push(m.getMessage()); + } + + /** + * method to read consumer messages + * + * @param topic + * @param consumerName + * @return + */ + public synchronized Consumer.Message get(String topic, String consumerName) { + final LogBuffer q = fQueue.get(topic); + if (q == null) { + return null; + } + + HashMap offsetMap = fOffsets.get(consumerName); + if (offsetMap == null) { + offsetMap = new HashMap(); + fOffsets.put(consumerName, offsetMap); + } + Integer offset = offsetMap.get(topic); + if (offset == null) { + offset = 0; + } + + final msgInfo result = q.read(offset); + if (result != null && result.msg != null) { + offsetMap.put(topic, result.offset + 1); + } + return result; + } + + /** + * static inner class used to details about consumed messages + * + * @author anowarul.islam + * + */ + private static class msgInfo implements Consumer.Message { + /** + * published message which is consumed + */ + public String msg; + /** + * offset associated with message + */ + public int offset; + + /** + * get offset of messages + */ + @Override + public long getOffset() { + return offset; + } + + /** + * get consumed message + */ + @Override + public String getMessage() { + return msg; + } + } + + /** + * + * @author sneha.d.desai + * + * private LogBuffer class has synchronized push and read method + */ + private class LogBuffer { + private int fBaseOffset; + private final int fMaxSize; + private final ArrayList fList; + + /** + * constructor initializing the offset, maxsize and list + * + * @param maxSize + */ + public LogBuffer(int maxSize) { + fBaseOffset = 0; + fMaxSize = maxSize; + fList = new ArrayList(); + } + + /** + * pushing message + * + * @param msg + */ + public synchronized void push(String msg) { + fList.add(msg); + while (fList.size() > fMaxSize) { + fList.remove(0); + fBaseOffset++; + } + } + + /** + * reading messages + * + * @param offset + * @return + */ + public synchronized msgInfo read(int offset) { + final int actual = Math.max(0, offset - fBaseOffset); + + final msgInfo mi = new msgInfo(); + mi.msg = (actual >= fList.size()) ? null : fList.get(actual); + if (mi.msg == null) + return null; + + mi.offset = actual + fBaseOffset; + return mi; + } + + } +} diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java new file mode 100644 index 0000000..2b43ed3 --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueuePublisher.java @@ -0,0 +1,92 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.memory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import com.att.dmf.mr.backends.Publisher; +import com.att.dmf.mr.metabroker.Broker.TopicExistsException; + + + +/** + * + * @author anowarul.islam + * + */ +public class MemoryQueuePublisher implements Publisher { + /** + * + * @param q + * @param b + */ + public MemoryQueuePublisher(MemoryQueue q, MemoryMetaBroker b) { + fBroker = b; + fQueue = q; + } + + + /** + * + * @param topic + * @param msg + * @throws IOException + */ + @Override + public void sendMessage(String topic, message msg) throws IOException { + if (null == fBroker.getTopic(topic)) { + try { + fBroker.createTopic(topic, topic, null, 8, 3, false); + } catch (TopicExistsException e) { + throw new RuntimeException(e); + } + } + fQueue.put(topic, msg); + } + + @Override + /** + * @param topic + * @param msgs + * @throws IOException + */ + + public void sendBatchMessageNew(String topic, ArrayList> kms) throws IOException { + + } + + public void sendMessagesNew(String topic, List msgs) throws IOException { + } + + public void sendMessages(String topic, List msgs) throws IOException { + for (message m : msgs) { + sendMessage(topic, m); + } + } + + private final MemoryMetaBroker fBroker; + private final MemoryQueue fQueue; +} diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java new file mode 100644 index 0000000..8e41c9f --- /dev/null +++ b/src/main/java/com/att/dmf/mr/backends/memory/MessageLogger.java @@ -0,0 +1,109 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.dmf.mr.backends.memory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import com.att.dmf.mr.backends.Publisher; + +//import kafka.producer.KeyedMessage; + +/** + * class used for logging perspective + * + * @author anowarul.islam + * + */ +public class MessageLogger implements Publisher { + public MessageLogger() { + } + + public void setFile(File f) throws FileNotFoundException { + fStream = new FileOutputStream(f, true); + } + + /** + * + * @param topic + * @param msg + * @throws IOException + */ + @Override + public void sendMessage(String topic, message msg) throws IOException { + logMsg(msg); + } + + /** + * @param topic + * @param msgs + * @throws IOException + */ + @Override + public void sendMessages(String topic, List msgs) throws IOException { + for (message m : msgs) { + logMsg(m); + } + } + + /** + * @param topic + * @param kms + * @throws IOException + + @Override + public void sendBatchMessage(String topic, ArrayList> kms) throws + + IOException { + } + */ + private FileOutputStream fStream; + + /** + * + * @param msg + * @throws IOException + */ + private void logMsg(message msg) throws IOException { + String key = msg.getKey(); + if (key == null) + key = ""; + + fStream.write('['); + fStream.write(key.getBytes()); + fStream.write("] ".getBytes()); + fStream.write(msg.getMessage().getBytes()); + fStream.write('\n'); + } + public void sendBatchMessageNew(String topic, ArrayList> kms) throws IOException { + + } + + public void sendMessagesNew(String topic, List msgs) throws IOException { + } +} -- cgit 1.2.3-korg