diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/backends')
7 files changed, 75 insertions, 91 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java index 2743fc3..f4a9a80 100644 --- a/src/main/java/com/att/dmf/mr/backends/Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java @@ -21,7 +21,6 @@ *******************************************************************************/ package com.att.dmf.mr.backends; -import java.util.ArrayList; /** * A consumer interface. Consumers pull the next message from a given topic. diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 126711a..83c08ec 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.exception.DMaaPErrorMessages; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.metrics.CdmTimer; @@ -101,7 +101,7 @@ public class KafkaConsumerCache { // the server at least every 30 seconds, timing out after 2 minutes should // be okay. // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + private static final long kDefault_MustTouchEveryMs = 1000L*60*2; // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; @@ -110,16 +110,13 @@ public class KafkaConsumerCache { NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED } - // @Qualifier("kafkalockavoid") - - // @Resource - // @Qualifier("kafkalockavoid") - // KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + + @Autowired private DMaaPErrorMessages errorMessages; - // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider(); + /** * User defined exception class for kafka consumer cache * @@ -267,8 +264,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - // final long freq = fSettings.getLong(kSetting_SweepEverySeconds, - // kDefault_SweepEverySeconds); + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -301,8 +298,8 @@ public class KafkaConsumerCache { try { curator.blockUntilConnected(); } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); + log.error("error while setting curator framework :",e); + Thread.currentThread().interrupt(); } } @@ -393,8 +390,8 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -402,9 +399,9 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + + + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -417,8 +414,7 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -426,9 +422,7 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(group)) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -454,7 +448,7 @@ public class KafkaConsumerCache { final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); fConsumers.put(consumerKey, consumer); - // String appId = "node-instance-"+i; + log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId); } @@ -517,7 +511,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in dropTimedOutConsumer",e); + Thread.currentThread().interrupt(); } log.info("Dropped " + key + " consumer due to timeout"); } @@ -549,7 +544,7 @@ public class KafkaConsumerCache { final Kafka011Consumer kc = fConsumers.get(key); log.info("closing Kafka consumer " + key + " object " + kc); if (kc != null) { - // log.info("closing Kafka consumer " + key); + if (kc.close()) { fConsumers.remove(key); @@ -644,9 +639,8 @@ public class KafkaConsumerCache { throws KafkaConsumerCacheException { // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId> final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); - try { + try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) { final String consumerPath = fBaseZkPath + "/" + consumerKey; log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); final CuratorFramework curator = ConfigurationReader.getCurator(); @@ -673,7 +667,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in signalOwnership",e); + Thread.currentThread().interrupt(); } } @@ -690,8 +685,7 @@ public class KafkaConsumerCache { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - // final long mustTouchEveryMs = - // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -744,6 +738,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - // private static final Logger log = - // LoggerFactory.getLogger(KafkaConsumerCache.class); + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java index 805701a..f521b41 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java @@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka") @Component @@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 { @PostConstruct public void init() { - System.out.println("Welcome......................................................................................"); + log.info("Welcome......................................................................................"); try { if (curatorFramework.checkExists().forPath(locksPath) == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath); @@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 { } } catch (Exception e) { - //e.printStackTrace(); + log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e); } @@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 { protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) { String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId; - //Watcher processNodeWatcher = createWatcher(); + try { diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index 30209f0..735e372 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import kafka.FailedToSendMessageException; -//import kafka.javaapi.producer.Producer; -//import kafka.producer.KeyedMessage; -//import kafka.producer.ProducerConfig; -//import kafka.producer.KeyedMessage; + /** * Sends raw JSON objects into Kafka. @@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher { kafkaConnUrl="localhost:9092"; } - //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf"; - // props.put("bootstrap.servers", bootSever); - //System.setProperty("java.security.auth.login.config",jaaspath); + - /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); - transferSetting( props, "sasl.mechanism", "PLAIN");*/ + + transferSetting( props, "bootstrap.servers",kafkaConnUrl); - //transferSetting( props, "metadata.broker.list", kafkaConnUrl); + transferSetting( props, "request.required.acks", "1"); transferSetting( props, "message.send.max.retries", "5"); transferSetting(props, "retry.backoff.ms", "150"); - //props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - //fConfig = new ProducerConfig(props); - //fProducer = new Producer<String, String>(fConfig); + + fProducer = new KafkaProducer<>(props); } @@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher { throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size()); + final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); for (message o : msgs) { - final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString()); - //kms.add(data); + final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); + try { @@ -200,7 +193,7 @@ try{ } //private final rrNvReadable fSettings; - //private ProducerConfig fConfig; + private Producer<String, String> fProducer; /** @@ -227,9 +220,5 @@ try{ } - //@Override - //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { - // TODO Auto-generated method stub - - //} + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java index 0c34bfd..237cac8 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java @@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory; */ public class MemoryConsumerFactory implements ConsumerFactory { + + private final MemoryQueue fQueue; + /** * * Initializing constructor @@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return new MemoryConsumer ( topic, consumerGroupId ); } - private final MemoryQueue fQueue; - /** * * Define nested inner class @@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory */ private class MemoryConsumer implements Consumer { + + private final String fTopic; + private final String fConsumer; + private final long fCreateMs; + private long fLastAccessMs; + /** * * Initializing MemoryConsumer constructor @@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return fQueue.get ( fTopic, fConsumer ); } - private final String fTopic; - private final String fConsumer; - private final long fCreateMs; - private long fLastAccessMs; - @Override public boolean close() { //Nothing to close/clean up. @@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory */ public Collection<? extends Consumer> getConsumers () { - return new ArrayList<MemoryConsumer> (); + return new ArrayList<> (); } @Override diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java index 22f0588..e0c80bd 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java @@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey; * */ public class MemoryMetaBroker implements Broker { + + private final MemoryQueue fQueue; + private final HashMap<String, MemTopic> fTopics; + /** * * @param mq @@ -46,9 +50,9 @@ public class MemoryMetaBroker implements Broker { * @param settings */ public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { - //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { + fQueue = mq; - fTopics = new HashMap<String, MemTopic>(); + fTopics = new HashMap<>(); } @Override @@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker { fQueue.removeTopic(topic); } - private final MemoryQueue fQueue; - private final HashMap<String, MemTopic> fTopics; - private static class MemTopic implements Topic { + + private final String fName; + private final String fDesc; + private final String fOwner; + private NsaAcl fReaders; + private NsaAcl fWriters; + private boolean ftransactionEnabled; + private String accessDenied = "User does not own this topic "; + /** * constructor initialization * @@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fWriters == null) { fWriters = new NsaAcl(); @@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker { @Override public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fWriters.remove(publisherId); } @@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fReaders == null) { fReaders = new NsaAcl(); @@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker { @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fReaders.remove(consumerId); } - private final String fName; - private final String fDesc; - private final String fOwner; - private NsaAcl fReaders; - private NsaAcl fWriters; - private boolean ftransactionEnabled; - @Override public boolean isTransactionEnabled() { return ftransactionEnabled; @@ -190,7 +193,7 @@ public class MemoryMetaBroker implements Broker { @Override public Set<String> getOwners() { - final TreeSet<String> set = new TreeSet<String> (); + final TreeSet<String> set = new TreeSet<> (); set.add ( fOwner ); return set; } diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java index 0629972..8ab4619 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java @@ -43,7 +43,7 @@ public class MemoryQueue { * constructor storing hashMap objects in Queue and Offsets object */ public MemoryQueue() { - fQueue = new HashMap<String, LogBuffer>(); + fQueue = new HashMap<>(); fOffsets = new HashMap<String, HashMap<String, Integer>>(); } @@ -102,7 +102,7 @@ public class MemoryQueue { HashMap<String, Integer> offsetMap = fOffsets.get(consumerName); if (offsetMap == null) { - offsetMap = new HashMap<String, Integer>(); + offsetMap = new HashMap<>(); fOffsets.put(consumerName, offsetMap); } Integer offset = offsetMap.get(topic); @@ -169,7 +169,7 @@ public class MemoryQueue { public LogBuffer(int maxSize) { fBaseOffset = 0; fMaxSize = maxSize; - fList = new ArrayList<String>(); + fList = new ArrayList<>(); } /** |