summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/backends
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends')
-rw-r--r--src/main/java/com/att/dmf/mr/backends/Consumer.java1
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java59
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java8
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java37
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java18
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java37
-rw-r--r--src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java6
7 files changed, 75 insertions, 91 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java
index 2743fc3..f4a9a80 100644
--- a/src/main/java/com/att/dmf/mr/backends/Consumer.java
+++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java
@@ -21,7 +21,6 @@
*******************************************************************************/
package com.att.dmf.mr.backends;
-import java.util.ArrayList;
/**
* A consumer interface. Consumers pull the next message from a given topic.
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
index 126711a..83c08ec 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.exception.DMaaPErrorMessages;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.metrics.CdmTimer;
@@ -101,7 +101,7 @@ public class KafkaConsumerCache {
// the server at least every 30 seconds, timing out after 2 minutes should
// be okay.
// FIXME: consider allowing the client to specify its expected call rate?
- private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2;
+ private static final long kDefault_MustTouchEveryMs = 1000L*60*2;
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
@@ -110,16 +110,13 @@ public class KafkaConsumerCache {
NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
}
- // @Qualifier("kafkalockavoid")
-
- // @Resource
- // @Qualifier("kafkalockavoid")
- // KafkaLiveLockAvoider2 kafkaLiveLockAvoider;
+
+
@Autowired
private DMaaPErrorMessages errorMessages;
- // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider();
+
/**
* User defined exception class for kafka consumer cache
*
@@ -267,8 +264,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
- // final long freq = fSettings.getLong(kSetting_SweepEverySeconds,
- // kDefault_SweepEverySeconds);
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -301,8 +298,8 @@ public class KafkaConsumerCache {
try {
curator.blockUntilConnected();
} catch (InterruptedException e) {
- // Ignore
- log.error("error while setting curator framework :" + e.getMessage());
+ log.error("error while setting curator framework :",e);
+ Thread.currentThread().interrupt();
}
}
@@ -393,8 +390,8 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -402,9 +399,9 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
+
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -417,8 +414,7 @@ public class KafkaConsumerCache {
if (getStatus() != KafkaConsumerCache.Status.CONNECTED)
throw new KafkaConsumerCacheException("The cache service is unavailable.");
ArrayList<Kafka011Consumer> kcl = new ArrayList<>();
- // final String consumerKey = makeConsumerKey(topic, consumerGroupId,
- // clientId);
+
Enumeration<String> strEnum = fConsumers.keys();
String consumerLocalKey = null;
while (strEnum.hasMoreElements()) {
@@ -426,9 +422,7 @@ public class KafkaConsumerCache {
if (consumerLocalKey.startsWith(group)) {
- // System.out.println("consumer key returning from
- // getConsumerListforCG +++++++++ " + consumerLocalKey
- // + " " + fConsumers.get(consumerLocalKey));
+
kcl.add(fConsumers.get(consumerLocalKey));
}
@@ -454,7 +448,7 @@ public class KafkaConsumerCache {
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
fConsumers.put(consumerKey, consumer);
- // String appId = "node-instance-"+i;
+
log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId);
}
@@ -517,7 +511,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in dropTimedOutConsumer",e);
+ Thread.currentThread().interrupt();
}
log.info("Dropped " + key + " consumer due to timeout");
}
@@ -549,7 +544,7 @@ public class KafkaConsumerCache {
final Kafka011Consumer kc = fConsumers.get(key);
log.info("closing Kafka consumer " + key + " object " + kc);
if (kc != null) {
- // log.info("closing Kafka consumer " + key);
+
if (kc.close()) {
fConsumers.remove(key);
@@ -644,9 +639,8 @@ public class KafkaConsumerCache {
throws KafkaConsumerCacheException {
// get a lock at <base>/<topic>::<consumerGroupId>::<consumerId>
final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId);
- final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership");
- try {
+ try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) {
final String consumerPath = fBaseZkPath + "/" + consumerKey;
log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey);
final CuratorFramework curator = ConfigurationReader.getCurator();
@@ -673,7 +667,8 @@ public class KafkaConsumerCache {
consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs);
Thread.sleep(consumerHandoverWaitMs);
} catch (InterruptedException e) {
- // Ignore
+ log.error("InterruptedException in signalOwnership",e);
+ Thread.currentThread().interrupt();
}
}
@@ -690,8 +685,7 @@ public class KafkaConsumerCache {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
- // final long mustTouchEveryMs =
- // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs);
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -744,6 +738,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
- // private static final Logger log =
- // LoggerFactory.getLogger(KafkaConsumerCache.class);
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
index 805701a..f521b41 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java
@@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
-import org.springframework.stereotype.Service;
//@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
@Component
@@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 {
@PostConstruct
public void init() {
- System.out.println("Welcome......................................................................................");
+ log.info("Welcome......................................................................................");
try {
if (curatorFramework.checkExists().forPath(locksPath) == null) {
curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
@@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 {
}
} catch (Exception e) {
- //e.printStackTrace();
+
log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
}
@@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 {
protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
- //Watcher processNodeWatcher = createWatcher();
+
try {
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index 30209f0..735e372 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import kafka.FailedToSendMessageException;
-//import kafka.javaapi.producer.Producer;
-//import kafka.producer.KeyedMessage;
-//import kafka.producer.ProducerConfig;
-//import kafka.producer.KeyedMessage;
+
/**
* Sends raw JSON objects into Kafka.
@@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher {
kafkaConnUrl="localhost:9092";
}
- //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf";
- // props.put("bootstrap.servers", bootSever);
- //System.setProperty("java.security.auth.login.config",jaaspath);
+
- /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
- transferSetting( props, "sasl.mechanism", "PLAIN");*/
+
+
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
- //transferSetting( props, "metadata.broker.list", kafkaConnUrl);
+
transferSetting( props, "request.required.acks", "1");
transferSetting( props, "message.send.max.retries", "5");
transferSetting(props, "retry.backoff.ms", "150");
- //props.put("serializer.class", "kafka.serializer.StringEncoder");
+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- //fConfig = new ProducerConfig(props);
- //fProducer = new Producer<String, String>(fConfig);
+
+
fProducer = new KafkaProducer<>(props);
}
@@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher {
throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
try{
- final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size());
+ final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size());
for (message o : msgs) {
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString());
- //kms.add(data);
+ final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString());
+
try {
@@ -200,7 +193,7 @@ try{
}
//private final rrNvReadable fSettings;
- //private ProducerConfig fConfig;
+
private Producer<String, String> fProducer;
/**
@@ -227,9 +220,5 @@ try{
}
- //@Override
- //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- // TODO Auto-generated method stub
-
- //}
+
} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
index 0c34bfd..237cac8 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java
@@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory;
*/
public class MemoryConsumerFactory implements ConsumerFactory
{
+
+ private final MemoryQueue fQueue;
+
/**
*
* Initializing constructor
@@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return new MemoryConsumer ( topic, consumerGroupId );
}
- private final MemoryQueue fQueue;
-
/**
*
* Define nested inner class
@@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
private class MemoryConsumer implements Consumer
{
+
+ private final String fTopic;
+ private final String fConsumer;
+ private final long fCreateMs;
+ private long fLastAccessMs;
+
/**
*
* Initializing MemoryConsumer constructor
@@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory
return fQueue.get ( fTopic, fConsumer );
}
- private final String fTopic;
- private final String fConsumer;
- private final long fCreateMs;
- private long fLastAccessMs;
-
@Override
public boolean close() {
//Nothing to close/clean up.
@@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory
*/
public Collection<? extends Consumer> getConsumers ()
{
- return new ArrayList<MemoryConsumer> ();
+ return new ArrayList<> ();
}
@Override
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
index 22f0588..e0c80bd 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java
@@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey;
*
*/
public class MemoryMetaBroker implements Broker {
+
+ private final MemoryQueue fQueue;
+ private final HashMap<String, MemTopic> fTopics;
+
/**
*
* @param mq
@@ -46,9 +50,9 @@ public class MemoryMetaBroker implements Broker {
* @param settings
*/
public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
- //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) {
+
fQueue = mq;
- fTopics = new HashMap<String, MemTopic>();
+ fTopics = new HashMap<>();
}
@Override
@@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker {
fQueue.removeTopic(topic);
}
- private final MemoryQueue fQueue;
- private final HashMap<String, MemTopic> fTopics;
-
private static class MemTopic implements Topic {
+
+ private final String fName;
+ private final String fDesc;
+ private final String fOwner;
+ private NsaAcl fReaders;
+ private NsaAcl fWriters;
+ private boolean ftransactionEnabled;
+ private String accessDenied = "User does not own this topic ";
+
/**
* constructor initialization
*
@@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fWriters == null) {
fWriters = new NsaAcl();
@@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fWriters.remove(publisherId);
}
@@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
if (fReaders == null) {
fReaders = new NsaAcl();
@@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker {
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
if (!fOwner.equals(asUser.getKey())) {
- throw new AccessDeniedException("User does not own this topic " + fName);
+ throw new AccessDeniedException(accessDenied + fName);
}
fReaders.remove(consumerId);
}
- private final String fName;
- private final String fDesc;
- private final String fOwner;
- private NsaAcl fReaders;
- private NsaAcl fWriters;
- private boolean ftransactionEnabled;
-
@Override
public boolean isTransactionEnabled() {
return ftransactionEnabled;
@@ -190,7 +193,7 @@ public class MemoryMetaBroker implements Broker {
@Override
public Set<String> getOwners() {
- final TreeSet<String> set = new TreeSet<String> ();
+ final TreeSet<String> set = new TreeSet<> ();
set.add ( fOwner );
return set;
}
diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
index 0629972..8ab4619 100644
--- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
+++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java
@@ -43,7 +43,7 @@ public class MemoryQueue {
* constructor storing hashMap objects in Queue and Offsets object
*/
public MemoryQueue() {
- fQueue = new HashMap<String, LogBuffer>();
+ fQueue = new HashMap<>();
fOffsets = new HashMap<String, HashMap<String, Integer>>();
}
@@ -102,7 +102,7 @@ public class MemoryQueue {
HashMap<String, Integer> offsetMap = fOffsets.get(consumerName);
if (offsetMap == null) {
- offsetMap = new HashMap<String, Integer>();
+ offsetMap = new HashMap<>();
fOffsets.put(consumerName, offsetMap);
}
Integer offset = offsetMap.get(topic);
@@ -169,7 +169,7 @@ public class MemoryQueue {
public LogBuffer(int maxSize) {
fBaseOffset = 0;
fMaxSize = maxSize;
- fList = new ArrayList<String>();
+ fList = new ArrayList<>();
}
/**