diff options
Diffstat (limited to 'src/main/java')
40 files changed, 370 insertions, 484 deletions
diff --git a/src/main/java/com/att/dmf/mr/CambriaApiException.java b/src/main/java/com/att/dmf/mr/CambriaApiException.java index 84dd32c..cdf95ab 100644 --- a/src/main/java/com/att/dmf/mr/CambriaApiException.java +++ b/src/main/java/com/att/dmf/mr/CambriaApiException.java @@ -28,8 +28,12 @@ import com.att.nsa.apiServer.NsaAppException; public class CambriaApiException extends NsaAppException { + /* + * defined long type constant serialVersionUID + */ + private static final long serialVersionUID = 1L; - private ErrorResponse errRes; + private transient ErrorResponse errRes; /** * Implements constructor CambriaApiException * @param jsonObject @@ -66,10 +70,6 @@ public class CambriaApiException extends NsaAppException this.errRes = errRes; } - /* - * defined long type constant serialVersionUID - */ - private static final long serialVersionUID = 1L; public ErrorResponse getErrRes() { return errRes; } diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java index 2743fc3..f4a9a80 100644 --- a/src/main/java/com/att/dmf/mr/backends/Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java @@ -21,7 +21,6 @@ *******************************************************************************/ package com.att.dmf.mr.backends; -import java.util.ArrayList; /** * A consumer interface. Consumers pull the next message from a given topic. diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 126711a..83c08ec 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -61,8 +61,8 @@ import com.att.dmf.mr.backends.MetricsSet; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.exception.DMaaPErrorMessages; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.metrics.CdmTimer; @@ -101,7 +101,7 @@ public class KafkaConsumerCache { // the server at least every 30 seconds, timing out after 2 minutes should // be okay. // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + private static final long kDefault_MustTouchEveryMs = 1000L*60*2; // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; @@ -110,16 +110,13 @@ public class KafkaConsumerCache { NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED } - // @Qualifier("kafkalockavoid") - - // @Resource - // @Qualifier("kafkalockavoid") - // KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + + @Autowired private DMaaPErrorMessages errorMessages; - // KafkaLiveLockAvoider kafkaLiveLockAvoider = new KafkaLiveLockAvoider(); + /** * User defined exception class for kafka consumer cache * @@ -267,8 +264,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - // final long freq = fSettings.getLong(kSetting_SweepEverySeconds, - // kDefault_SweepEverySeconds); + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -301,8 +298,8 @@ public class KafkaConsumerCache { try { curator.blockUntilConnected(); } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); + log.error("error while setting curator framework :",e); + Thread.currentThread().interrupt(); } } @@ -393,8 +390,8 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -402,9 +399,9 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(topicgroup) && (!consumerLocalKey.endsWith("::" + clientId))) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + + + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -417,8 +414,7 @@ public class KafkaConsumerCache { if (getStatus() != KafkaConsumerCache.Status.CONNECTED) throw new KafkaConsumerCacheException("The cache service is unavailable."); ArrayList<Kafka011Consumer> kcl = new ArrayList<>(); - // final String consumerKey = makeConsumerKey(topic, consumerGroupId, - // clientId); + Enumeration<String> strEnum = fConsumers.keys(); String consumerLocalKey = null; while (strEnum.hasMoreElements()) { @@ -426,9 +422,7 @@ public class KafkaConsumerCache { if (consumerLocalKey.startsWith(group)) { - // System.out.println("consumer key returning from - // getConsumerListforCG +++++++++ " + consumerLocalKey - // + " " + fConsumers.get(consumerLocalKey)); + kcl.add(fConsumers.get(consumerLocalKey)); } @@ -454,7 +448,7 @@ public class KafkaConsumerCache { final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); fConsumers.put(consumerKey, consumer); - // String appId = "node-instance-"+i; + log.info("^@ Consumer Added to Cache Consumer Key" + consumerKey + " ApiId" + fApiId); } @@ -517,7 +511,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in dropTimedOutConsumer",e); + Thread.currentThread().interrupt(); } log.info("Dropped " + key + " consumer due to timeout"); } @@ -549,7 +544,7 @@ public class KafkaConsumerCache { final Kafka011Consumer kc = fConsumers.get(key); log.info("closing Kafka consumer " + key + " object " + kc); if (kc != null) { - // log.info("closing Kafka consumer " + key); + if (kc.close()) { fConsumers.remove(key); @@ -644,9 +639,8 @@ public class KafkaConsumerCache { throws KafkaConsumerCacheException { // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId> final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); - try { + try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) { final String consumerPath = fBaseZkPath + "/" + consumerKey; log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); final CuratorFramework curator = ConfigurationReader.getCurator(); @@ -673,7 +667,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in signalOwnership",e); + Thread.currentThread().interrupt(); } } @@ -690,8 +685,7 @@ public class KafkaConsumerCache { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - // final long mustTouchEveryMs = - // fSettings.getLong(kSetting_TouchEveryMs, kDefault_MustTouchEveryMs); + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -744,6 +738,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - // private static final Logger log = - // LoggerFactory.getLogger(KafkaConsumerCache.class); + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java index 805701a..f521b41 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java @@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka") @Component @@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 { @PostConstruct public void init() { - System.out.println("Welcome......................................................................................"); + log.info("Welcome......................................................................................"); try { if (curatorFramework.checkExists().forPath(locksPath) == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath); @@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 { } } catch (Exception e) { - //e.printStackTrace(); + log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e); } @@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 { protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) { String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId; - //Watcher processNodeWatcher = createWatcher(); + try { diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index 30209f0..735e372 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -41,11 +41,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import kafka.FailedToSendMessageException; -//import kafka.javaapi.producer.Producer; -//import kafka.producer.KeyedMessage; -//import kafka.producer.ProducerConfig; -//import kafka.producer.KeyedMessage; + /** * Sends raw JSON objects into Kafka. @@ -76,26 +72,23 @@ public class KafkaPublisher implements Publisher { kafkaConnUrl="localhost:9092"; } - //String jaaspath="C:/ATT/Apps/dmaapCodedmaap-framework/dmaap/bundleconfig-local/etc/appprops/kafka_pub_jaas.conf"; - // props.put("bootstrap.servers", bootSever); - //System.setProperty("java.security.auth.login.config",jaaspath); + - /*transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); - transferSetting( props, "sasl.mechanism", "PLAIN");*/ + + transferSetting( props, "bootstrap.servers",kafkaConnUrl); - //transferSetting( props, "metadata.broker.list", kafkaConnUrl); + transferSetting( props, "request.required.acks", "1"); transferSetting( props, "message.send.max.retries", "5"); transferSetting(props, "retry.backoff.ms", "150"); - //props.put("serializer.class", "kafka.serializer.StringEncoder"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - //fConfig = new ProducerConfig(props); - //fProducer = new Producer<String, String>(fConfig); + + fProducer = new KafkaProducer<>(props); } @@ -180,11 +173,11 @@ public class KafkaPublisher implements Publisher { throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); try{ - final List<ProducerRecord<String, String>> kms = new ArrayList<ProducerRecord<String, String>>(msgs.size()); + final List<ProducerRecord<String, String>> kms = new ArrayList<>(msgs.size()); for (message o : msgs) { - final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, o.getKey(), o.toString()); - //kms.add(data); + final ProducerRecord<String, String> data = new ProducerRecord<>(topic, o.getKey(), o.toString()); + try { @@ -200,7 +193,7 @@ try{ } //private final rrNvReadable fSettings; - //private ProducerConfig fConfig; + private Producer<String, String> fProducer; /** @@ -227,9 +220,5 @@ try{ } - //@Override - //public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { - // TODO Auto-generated method stub - - //} + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java index 0c34bfd..237cac8 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java @@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory; */ public class MemoryConsumerFactory implements ConsumerFactory { + + private final MemoryQueue fQueue; + /** * * Initializing constructor @@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return new MemoryConsumer ( topic, consumerGroupId ); } - private final MemoryQueue fQueue; - /** * * Define nested inner class @@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory */ private class MemoryConsumer implements Consumer { + + private final String fTopic; + private final String fConsumer; + private final long fCreateMs; + private long fLastAccessMs; + /** * * Initializing MemoryConsumer constructor @@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return fQueue.get ( fTopic, fConsumer ); } - private final String fTopic; - private final String fConsumer; - private final long fCreateMs; - private long fLastAccessMs; - @Override public boolean close() { //Nothing to close/clean up. @@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory */ public Collection<? extends Consumer> getConsumers () { - return new ArrayList<MemoryConsumer> (); + return new ArrayList<> (); } @Override diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java index 22f0588..e0c80bd 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java @@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey; * */ public class MemoryMetaBroker implements Broker { + + private final MemoryQueue fQueue; + private final HashMap<String, MemTopic> fTopics; + /** * * @param mq @@ -46,9 +50,9 @@ public class MemoryMetaBroker implements Broker { * @param settings */ public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) { - //public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb, rrNvReadable settings) { + fQueue = mq; - fTopics = new HashMap<String, MemTopic>(); + fTopics = new HashMap<>(); } @Override @@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker { fQueue.removeTopic(topic); } - private final MemoryQueue fQueue; - private final HashMap<String, MemTopic> fTopics; - private static class MemTopic implements Topic { + + private final String fName; + private final String fDesc; + private final String fOwner; + private NsaAcl fReaders; + private NsaAcl fWriters; + private boolean ftransactionEnabled; + private String accessDenied = "User does not own this topic "; + /** * constructor initialization * @@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fWriters == null) { fWriters = new NsaAcl(); @@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker { @Override public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fWriters.remove(publisherId); } @@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fReaders == null) { fReaders = new NsaAcl(); @@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker { @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fReaders.remove(consumerId); } - private final String fName; - private final String fDesc; - private final String fOwner; - private NsaAcl fReaders; - private NsaAcl fWriters; - private boolean ftransactionEnabled; - @Override public boolean isTransactionEnabled() { return ftransactionEnabled; @@ -190,7 +193,7 @@ public class MemoryMetaBroker implements Broker { @Override public Set<String> getOwners() { - final TreeSet<String> set = new TreeSet<String> (); + final TreeSet<String> set = new TreeSet<> (); set.add ( fOwner ); return set; } diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java index 0629972..8ab4619 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java @@ -43,7 +43,7 @@ public class MemoryQueue { * constructor storing hashMap objects in Queue and Offsets object */ public MemoryQueue() { - fQueue = new HashMap<String, LogBuffer>(); + fQueue = new HashMap<>(); fOffsets = new HashMap<String, HashMap<String, Integer>>(); } @@ -102,7 +102,7 @@ public class MemoryQueue { HashMap<String, Integer> offsetMap = fOffsets.get(consumerName); if (offsetMap == null) { - offsetMap = new HashMap<String, Integer>(); + offsetMap = new HashMap<>(); fOffsets.put(consumerName, offsetMap); } Integer offset = offsetMap.get(topic); @@ -169,7 +169,7 @@ public class MemoryQueue { public LogBuffer(int maxSize) { fBaseOffset = 0; fMaxSize = maxSize; - fList = new ArrayList<String>(); + fList = new ArrayList<>(); } /** diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 5f28367..8cbf64f 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; @@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker; */ @Component public class DMaaPCambriaLimiter { + private final HashMap<String, RateInfo> fRateInfo; + private final HashMap<String, RateInfoCheck> fRateInfoCheck; + private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + private final long fSleepMs1; + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + /** * constructor initializes * @@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter { * @throws invalidSettingValue */ @Autowired - public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) - throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) { + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter { 5000); } - - /** - * static method provide the sleep time - * - * @param ratePerMinute - * @return - */ - public static long getSleepMsForRate(double ratePerMinute) { - if (ratePerMinute <= 0.0) - return 0; - return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); - } - + /** * Construct a rate limiter. * @@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter { * @param windowLengthMins */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; @@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter { } /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** * Tell the rate limiter about a call to a topic/group/id. If the rate is * too high, this call delays its return and throws an exception. * @@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter { log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); } } catch (InterruptedException e) { + log.error("Exception "+ e); // ignore } @@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter { log.info(errRes.toString()); throw new CambriaApiException(errRes); } - /*if (fMaxPollsPerMinute <= 0) { - return; - } - final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); - final double ratevalue = ric.onCall(); - if (ratevalue > fMaxPollsPerMinute) { - try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute - + "."); - if (fSleepMs1 > fMaxPollsPerMinute) { - log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs - + " ms sleep, then responding in error."); - Thread.sleep(fSleepMs1); - ric.reset(); - } else { - log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); - } - } catch (InterruptedException e) { - // ignore - } - - - ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "This client is making too many requests " - + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); - - log.info(errRes.toString()); - throw new CambriaApiException(errRes); - }*/ } @@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter { } private static class RateInfo { + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } private static class RateInfoCheck { + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } - private final HashMap<String, RateInfo> fRateInfo; - private final HashMap<String, RateInfoCheck> fRateInfoCheck; - private final double fMaxEmptyPollsPerMinute; - private final double fMaxPollsPerMinute; - private final int fWindowLengthMins; - private final long fSleepMs; - private final long fSleepMs1; - //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { final String key = makeKey(topic, consumerGroup, clientId); @@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter { } - private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { - final String key = makeKey(topic, consumerGroup, clientId); - RateInfoCheck ri = fRateInfoCheck.get(key); - if (ri == null) { - ri = new RateInfoCheck(key, 1); - fRateInfoCheck.put(key, ri); - } - return ri; - } + diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 6fc0838..f60fd53 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; @@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - // private static final Logger log = LoggerFactory - // .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - // @Autowired - // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new - // KafkaLiveLockAvoider(); + /** * constructor initialization @@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - // metrics) : null; + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -189,14 +185,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - - fCache.signalOwnership(topic, consumerGroupName, consumerId); - + + if (fCache != null) { + fCache.signalOwnership(topic, consumerGroupName, consumerId); + } + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); - kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj() - // ); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider); log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); if (fCache != null) { @@ -265,10 +262,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null != keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } @@ -294,10 +290,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); - /*props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + + props.put("client.id", consumerId); // additional settings: start with our defaults, then pull in configured diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 643eae9..4bef985 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; + import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); @@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } public DMaaPKafkaMetaBroker( rrNvReadable settings, ZkClient zk, ConfigDb configDb,AdminClient client) { - //fSettings = settings; + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - // fKafkaAdminClient = null; + } @@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } catch ( InterruptedException e ) { - //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); throw new ConfigDbException ( e ); } catch ( ExecutionException e ) { - //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); throw new ConfigDbException ( e.getCause () ); } @@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); - */ + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java index 9942837..4c9532b 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java @@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe * * @param cs */ - //public DMaaPMetricsSet() { + public DMaaPMetricsSet(rrNvReadable cs) { - //fSettings = cs; + fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); super.putItem("version", fVersion); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java index e29403f..963ff2d 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java @@ -23,7 +23,7 @@ package com.att.dmf.mr.beans; import java.security.Key; -//import org.apache.log4-j.Logger; + import org.springframework.beans.factory.annotation.Autowired; import com.att.dmf.mr.constants.CambriaConstants; @@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor; */ public class DMaaPNsaApiDb { - //private rrNvReadable settings; + private DMaaPZkConfigDb cdb; //private static final Logger log = Logger - // .getLogger(DMaaPNsaApiDb.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); /** @@ -63,7 +63,7 @@ public class DMaaPNsaApiDb { */ @Autowired public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { - //this.setSettings(settings); + this.setCdb(cdb); } /** @@ -79,16 +79,16 @@ public class DMaaPNsaApiDb { missingReqdSetting { // Cambria uses an encrypted api key db - //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); + final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); - // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); + final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); // if neither value was provided, don't encrypt api key db if (keyBase64 == null && initVectorBase64 == null) { log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); - return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb, + return new BaseNsaApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory()); } else if (keyBase64 == null) { // neither or both, otherwise something's goofed @@ -100,7 +100,7 @@ public class DMaaPNsaApiDb { log.info("This server is configured to use an encrypted API key database."); final Key key = EncryptingLayer.readSecretKey(keyBase64); final byte[] iv = rrConvertor.base64Decode(initVectorBase64); - return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb, + return new EncryptingApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory(), key, iv); } } @@ -109,17 +109,17 @@ public class DMaaPNsaApiDb { * @return * returns settings */ -/* public rrNvReadable getSettings() { - return settings; - }*/ + + + /** * @param settings * set settings */ - /*public void setSettings(rrNvReadable settings) { - this.settings = settings; - }*/ + + + /** * @return diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java index d543721..5aa25fa 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java @@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import com.att.dmf.mr.utils.ConfigurationReader; import com.att.nsa.configs.confimpl.ZkConfigDb; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import com.att.nsa.configs.confimpl.ZkConfigDb; + /** * Provide the zookeeper config db connection * @author nilanjana.maity @@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb { public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, @Qualifier("propertyReader") rrNvReadable settings) { - //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); + super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); } diff --git a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java index a971c3f..db691bd 100644 --- a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java +++ b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java @@ -35,7 +35,7 @@ import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; import org.apache.http.HttpStatus; -//import org.apache.log-4j.Logger; + import org.springframework.beans.factory.annotation.Autowired; import com.att.eelf.configuration.EELFLogger; @@ -51,8 +51,7 @@ import com.att.eelf.configuration.EELFManager; @Singleton public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{ - //private static final Logger LOGGER = Logger - // .getLogger(DMaaPWebExceptionMapper.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class); private ErrorResponse errRes; diff --git a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java index 6022b91..64b20e8 100644 --- a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java +++ b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java @@ -35,7 +35,7 @@ import com.att.eelf.configuration.EELFManager; public class CambriaServletContextListener implements ServletContextListener { DME2EndPointLoader loader = DME2EndPointLoader.getInstance(); -// private static Logger log = Logger.getLogger(CambriaServletContextListener.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaServletContextListener.class); diff --git a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java index 7f27798..f61b6ea 100644 --- a/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java +++ b/src/main/java/com/att/dmf/mr/listener/DME2EndPointLoader.java @@ -51,7 +51,7 @@ public class DME2EndPointLoader { private String protocol; private String serviceURL; private static DME2EndPointLoader loader = new DME2EndPointLoader(); -// private static final Logger LOG = LoggerFactory.getLogger(EventsServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); private DME2EndPointLoader() { } diff --git a/src/main/java/com/att/dmf/mr/metabroker/Topic.java b/src/main/java/com/att/dmf/mr/metabroker/Topic.java index 422a2cc..d191070 100644 --- a/src/main/java/com/att/dmf/mr/metabroker/Topic.java +++ b/src/main/java/com/att/dmf/mr/metabroker/Topic.java @@ -39,16 +39,16 @@ public interface Topic extends ReadWriteSecuredResource * *//* public class AccessDeniedException extends Exception - { + *//** * AccessDenied Description *//* - public AccessDeniedException () { super ( "Access denied." ); } + *//** * AccessDenied Exception for the user while authenticating the user request * @param user *//* - public AccessDeniedException ( String user ) { super ( "Access denied for " + user ); } + private static final long serialVersionUID = 1L; }*/ diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java index 0993aa6..4b219b1 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaClient.java @@ -21,11 +21,11 @@ *******************************************************************************/ package com.att.dmf.mr.metrics.publisher; -//import org.slf4j.Logger; + // import com.att.eelf.configuration.EELFLogger; -//import com.att.eelf.configuration.EELFManager; + /** * diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java index 1510c32..46dfa99 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/CambriaPublisherUtility.java @@ -95,7 +95,7 @@ public class CambriaPublisherUtility */ public static List<HttpHost> createHostsList(Collection<String> hosts) { - final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> (); + final ArrayList<HttpHost> convertedHosts = new ArrayList<>(); for ( String host : hosts ) { if ( host.length () == 0 ) continue; diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java index d02438f..9158c96 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java @@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return an identity manager */ - /* - * public static CambriaIdentityManager createIdentityManager ( - * Collection<String> hostSet, String apiKey, String apiSecret ) { final - * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet ); - * cim.setApiCredentials ( apiKey, apiSecret ); return cim; } - */ + /** * Create a topic manager for working with topics. @@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return a topic manager */ - /* - * public static CambriaTopicManager createTopicManager ( Collection<String> - * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi - * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey, - * apiSecret ); return tmi; } - */ + /** * Inject a consumer. Used to support unit tests. diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java index 08b2fd1..ebdf3ed 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java @@ -31,7 +31,7 @@ import org.json.JSONArray; import org.json.JSONException; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; + //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException { - /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, - CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/ + + super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); - //fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); //( this.getClass().getName () ); } @@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr { fLog = log; - //replaceLogger ( log ); + } public EELFLogger getLog () diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java index d8d8799..dee9e57 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java @@ -186,7 +186,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient public void close() { try { final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (remains.size() > 0) { + if (remains.isEmpty()) { getLog().warn("Closing publisher with " + remains.size() + " messages unsent. " + "Consider using CambriaBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close."); } @@ -251,7 +251,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient */ private synchronized boolean shouldSendNow() { boolean shouldSend = false; - if (fPending.size() > 0) { + if (fPending.isEmpty()) { final long nowMs = Clock.now(); shouldSend = (fPending.size() >= fMaxBatchSize); @@ -273,7 +273,7 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient private synchronized boolean sendBatch() { // it's possible for this call to be made with an empty list. in this // case, just return. - if (fPending.size() < 1) { + if (fPending.isEmpty()) { return true; } @@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient // code from REST Client Starts - // final String serverCalculatedSignature = sha1HmacSigner.sign - // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV"); + + Client client = ClientBuilder.newClient(); String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); @@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria"); Response response = target.request().post(data); - // header("X-CambriaAuth", - // "2OH46YIWa329QpEF:"+serverCalculatedSignature). - // header("X-CambriaDate", "2015-09-21T11:38:19-0700"). - // post(Entity.json(baseStream.toByteArray())); - + getLog().info("Response received :: " + response.getStatus()); getLog().info("Response received :: " + response.toString()); // code from REST Client Ends - /* - * final JSONObject result = post ( url, contentType, - * baseStream.toByteArray(), true ); final String logLine = - * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" + - * result.toString (); getLog().info ( logLine ); - */ + fPending.clear(); return true; } catch (IllegalArgumentException x) { getLog().warn(x.getMessage(), x); } - /* - * catch ( HttpObjectNotFoundException x ) { getLog().warn ( - * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn ( - * x.getMessage(), x ); } - */ + catch (IOException x) { getLog().warn(x.getMessage(), x); } diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java index 98ddb50..7a67c92 100644 --- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java +++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java @@ -81,10 +81,7 @@ public class CambriaJsonStreamReader implements reader { final int c = fTokens.next(); - /*if (c ==','){ - fCloseCount++; - System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount); - }*/ + if (fIsList) { if (c == ']' || (fCount > 0 && c == 10)) return null; @@ -125,7 +122,7 @@ public class CambriaJsonStreamReader implements reader { * * @param o */ - //public msg(JSONObject o){} + public msg(JSONObject o) { diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java index 376d140..f64c0de 100644 --- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java +++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java @@ -137,5 +137,5 @@ public class CambriaRawStreamReader implements reader private final InputStream fStream; private final String fDefPart; private boolean fClosed; - //private String transactionId; + } diff --git a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java index b550373..ed0893d 100644 --- a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java +++ b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java @@ -46,7 +46,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { auth = true; } - //System.out.println("role " +role +" user: "+ req.getRemoteUser() +" : auth="+auth); + return auth; } @@ -57,7 +57,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { String permission = ""; String nameSpace =""; if(topicName.contains(".") && topicName.contains("com.att")) { - //String topic = topicName.substring(topicName.lastIndexOf(".")+1); + nameSpace = topicName.substring(0,topicName.lastIndexOf(".")); } else { @@ -67,12 +67,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { if(null==nameSpace)nameSpace="com.att.dmaap.mr.ueb"; - /*ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.TOPIC_NOT_IN_AAF.getResponseCode(), "Topic does not exist in AAF" - , null, Utils.getFormattedDate(new Date()), topicName, - null, null, null, null); - - throw new CambriaApiException(errRes);*/ + } permission = nameSpace+".mr.topic|:topic."+topicName+"|"+action; diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java index e9f28ae..64dbc14 100644 --- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java +++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java @@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest; import com.att.dmf.mr.beans.DMaaPContext; import com.att.dmf.mr.security.DMaaPAuthenticator; -//import com.att.nsa.security.db.NsaApiDb; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.security.NsaApiKey; @@ -65,7 +65,7 @@ public class DMaaPMechIdAuthenticator <K extends NsaApiKey> implements DMaaPAuth log.info ( "AUTH-LOG(" + remoteAddr + "): " + msg ); } -// private final NsaApiDb<K> fDb; + //private static final Logger log = Logger.getLogger( MechIdAuthenticator.class.toString()); private static final EELFLogger log = EELFManager.getInstance().getLogger(MechIdAuthenticator.class); /** diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java index a26c9e7..b1e28e7 100644 --- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java +++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java @@ -54,9 +54,9 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP public DMaaPOriginalUebAuthenticator(NsaApiDb<K> db, long requestTimeWindowMs) { fDb = db; fRequestTimeWindowMs = requestTimeWindowMs; - //fAuthenticators = new LinkedList<DMaaPAuthenticator<K>>(); + - //fAuthenticators.add(new DMaaPOriginalUebAuthenticator<K>(db, requestTimeWindowMs)); + } @@ -243,51 +243,51 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP "EEEE, dd-MMM-yy HH:mm:ss zzz", }; - /*private static final String kDateFormats[] = { - // W3C date format (RFC 3339). - "yyyy-MM-dd'T'HH:mm:ssz", + + + - // Preferred HTTP date format (RFC 1123). - "EEE, dd MMM yyyy HH:mm:ss zzz", + + - // simple unix command line 'date' format - "EEE MMM dd HH:mm:ss z yyyy", + + - // Common date format (RFC 822). - "EEE, dd MMM yy HH:mm:ss z", "EEE, dd MMM yy HH:mm z", "dd MMM yy HH:mm:ss z", "dd MMM yy HH:mm z", + + - // Obsoleted HTTP date format (ANSI C asctime() format). - "EEE MMM dd HH:mm:ss yyyy", + + - // Obsoleted HTTP date format (RFC 1036). - "EEEE, dd-MMM-yy HH:mm:ss zzz", }; */ + + // logger declaration - //private static final Logger log = Logger.getLogger(DMaaPOriginalUebAuthenticator.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPOriginalUebAuthenticator.class); @Override -// public K authenticate(DMaaPContext ctx) { + // TODO Auto-generated method stub - //return null; + //} public K authenticate(DMaaPContext ctx) { - /*final HttpServletRequest req = ctx.getRequest(); - for (DMaaPAuthenticator<K> a : fAuthenticators) { - if (a.qualify(req)) { - final K k = a.isAuthentic(req); - if (k != null) - return k; - } - // else: this request doesn't look right to the authenticator - }*/ + + + + + + + + + return null; } public void addAuthenticator ( DMaaPAuthenticator<K> a ) { - //this.fAuthenticators.add(a); + } - //private final LinkedList<DMaaPAuthenticator<K>> fAuthenticators; + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java index 110970f..f7c48de 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java @@ -42,7 +42,7 @@ import com.att.nsa.configs.ConfigDbException; import com.att.nsa.limits.Blacklist; import com.att.nsa.security.NsaApiKey; import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -//import com.att.sa.highlandPark.util.HpJsonUtil; + /** * @author muzainulhaque.qazi diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java index c818f88..b0e8a86 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java @@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey; @Service public class ApiKeysServiceImpl implements ApiKeysService { - //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString()); /** * This method will provide all the ApiKeys present in kafka server. @@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous"); if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false"; - // if ((contactEmail == null) || (contactEmail.length() == 0)) + if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided ) { DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address."); @@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : " + key.toString() + "====="); apiKeyDb.saveApiKey(key); - // System.out.println("here4"); + // email out the secret to validate the email address if ( emailProvided ) { @@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { ); DMaaPResponseBuilder.respondOk(dmaapContext, o); - /*o.put("secret", "Emailed to " + contactEmail + "."); - DMaaPResponseBuilder.respondOk(dmaapContext, - o); */ + return; } else { log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.====="); diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java index 4ca6446..22b60fe 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java @@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor; @Service public class EventsServiceImpl implements EventsService { // private static final Logger LOG = - // Logger.getLogger(EventsServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); private static final String BATCH_LENGTH = "event.batch.length"; @@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService { private DMaaPErrorMessages errorMessages; //@Autowired - //KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + // @Value("${metrics.send.cambria.topic}") - // private String metricsTopic; + public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService { CambriaApiException, IOException, DMaaPAccessDeniedException { final long startTime = System.currentTimeMillis(); final HttpServletRequest req = ctx.getRequest(); - //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider); + boolean isAAFTopic = false; // was this host blacklisted? final String remoteAddr = Utils.getRemoteAddress(ctx); @@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService { if (strtimeoutMS != null) timeoutMs = Integer.parseInt(strtimeoutMS); // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - // CambriaConstants.kNoTimeout); + if (req.getParameter("timeout") != null) { timeoutMs = Integer.parseInt(req.getParameter("timeout")); } @@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService { // if headers are not provided then user will be null if (user == null && null != ctx.getRequest().getHeader("Authorization")) { // the topic name will be sent by the client - // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); String permission = aaf.aafPermissionString(topic, "sub"); if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { @@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService { logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + " " + clientId); Consumer c = null; - // String localclientId = clientId; + String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "clusterhostid"); if (null == lhostId) { @@ -481,16 +481,16 @@ public class EventsServiceImpl implements EventsService { // start processing, building a batch to push to the backend final long startMs = System.currentTimeMillis(); long count = 0; - long maxEventBatch = 1024 * 16; + long maxEventBatch = 1024L* 16; String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); if (null != batchlen) maxEventBatch = Long.parseLong(batchlen); // long maxEventBatch = // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); - final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>(); + final LinkedList<Publisher.message> batch = new LinkedList<>(); // final ArrayList<KeyedMessage<String, String>> kms = new // ArrayList<KeyedMessage<String, String>>(); - final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>(); + final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>(); try { // for each message... Publisher.message m = null; @@ -592,7 +592,7 @@ public class EventsServiceImpl implements EventsService { // start processing, building a batch to push to the backend final long startMs = System.currentTimeMillis(); long count = 0; - long maxEventBatch = 1024 * 16; + long maxEventBatch = 1024L * 16; String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); if (null != evenlen) maxEventBatch = Long.parseLong(evenlen); diff --git a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java index 83b3770..d867ea8 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java @@ -49,7 +49,7 @@ import com.att.nsa.metrics.CdmMeasuredItem; @Component public class MetricsServiceImpl implements MetricsService { - //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString()); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class); /** * diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java index 01ed1cc..7e9d783 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java @@ -45,7 +45,7 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.metabroker.Broker.TopicExistsException; import com.att.dmf.mr.metabroker.Broker1; -//import com.att.dmf.mr.metabroker.Broker1; + import com.att.dmf.mr.metabroker.Topic; import com.att.dmf.mr.security.DMaaPAAFAuthenticator; import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl; @@ -67,13 +67,13 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; public class TopicServiceImpl implements TopicService { // private static final Logger LOGGER = - // Logger.getLogger(TopicServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); @Autowired private DMaaPErrorMessages errorMessages; // @Value("${msgRtr.topicfactory.aaf}") - // private String mrFactory; + public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -125,7 +125,7 @@ public class TopicServiceImpl implements TopicService { for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { JSONObject obj = new JSONObject(); obj.put("topicName", topic.getName()); - // obj.put("description", topic.getDescription()); + obj.put("owner", topic.getOwner()); obj.put("txenabled", topic.isTransactionEnabled()); topicsList.put(obj); @@ -193,7 +193,7 @@ public class TopicServiceImpl implements TopicService { final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); String key = null; - //String appName = dmaapContext.getRequest().getHeader("AppName"); + String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"); @@ -209,55 +209,55 @@ public class TopicServiceImpl implements TopicService { "Failed to create topic: Access Denied.User does not have permission to perform create topic"); LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); + } } - // else if (user==null && + // (null==dmaapContext.getRequest().getHeader("Authorization") && null - // == dmaapContext.getRequest().getHeader("cookie")) ) { - /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization") - ) { - LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + + + + - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + + + - LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); - }*/ + + + if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization") )*/) { - // if (user == null && + // (null!=dmaapContext.getRequest().getHeader("Authorization") || - // null != dmaapContext.getRequest().getHeader("cookie"))) { + // ACL authentication is not provided so we will use the aaf // authentication - /*LOGGER.info("Authorization the topic"); + - String permission = ""; - String nameSpace = ""; - if (topicBean.getTopicName().indexOf(".") > 1) - nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf(".")); + + + + - String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "msgRtr.topicfactory.aaf"); + + - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + - permission = mrFactoryVal + nameSpace + "|create"; - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/ + + - //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { + if (false) { LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Failed to create topic: Access Denied.User does not have permission to create topic with perm " - //+ permission); + + "permission"); @@ -267,14 +267,14 @@ public class TopicServiceImpl implements TopicService { } else { // if user is null and aaf authentication is ok then key should // be "" - // key = ""; + /** * Added as part of AAF user it should return username */ - //key = dmaapContext.getRequest().getUserPrincipal().getName().toString(); - key="admin"; - LOGGER.info("key ==================== " + key); + + + //LOGGER.info("key ==================== " + key); } } @@ -283,7 +283,7 @@ public class TopicServiceImpl implements TopicService { final String topicName = topicBean.getTopicName(); final String desc = topicBean.getTopicDescription(); int partition = topicBean.getPartitionCount(); - // int replica = topicBean.getReplicationCount(); + if (partition == 0) { partition = 8; } @@ -291,7 +291,7 @@ public class TopicServiceImpl implements TopicService { int replica = topicBean.getReplicationCount(); if (replica == 0) { - //replica = 3; + replica = 1; } final int replicas = replica; @@ -319,7 +319,7 @@ public class TopicServiceImpl implements TopicService { throw new CambriaApiException(errRes); } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) { // TODO Auto-generated catch block - e.printStackTrace(); + LOGGER.error("Exception is at createTopic( ) ", e); } } @@ -503,25 +503,25 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // // LOGGER.info("Authenticating the user, as ACL authentication is not - // provided"); + //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - // String permission = aaf.aafPermissionString(topicName, "manage"); + + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to permit write access to producer [" + // producerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant publish permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // } // } @@ -561,25 +561,25 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to revoke write access to producer [" + // producerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Revoke publish permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // - // } + // } Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); @@ -612,22 +612,22 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + + // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to permit read access to consumer [" + // consumerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // } // } @@ -662,27 +662,26 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to revoke read access to consumer [" + // consumerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); // throw new DMaaPAccessDeniedException(errRes); // } // // - // } - + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); if (null == topic) { diff --git a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java index ae2d863..3065928 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java @@ -52,7 +52,7 @@ public class TransactionServiceImpl implements TransactionService { throws ConfigDbException, IOException { /* - * ConfigurationReader configReader = dmaapContext.getConfigReader(); + * * LOG.info("configReader : "+configReader.toString()); * @@ -77,7 +77,7 @@ public class TransactionServiceImpl implements TransactionService { IOException { /* - * if (null != transactionId) { + * * ConfigurationReader configReader = dmaapContext.getConfigReader(); * diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java index 33bc2f4..c8bb073 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -//import kafka.common.TopicExistsException; + import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONArray; import org.json.JSONObject; @@ -50,7 +50,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey; @Service public class UIServiceImpl implements UIService { - //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class); /** * Returning template of hello page diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java index dd1e4eb..fdf2d28 100644 --- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java +++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java @@ -55,7 +55,7 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import com.att.nsa.limits.Blacklist; import com.att.nsa.security.NsaAuthenticatorService; -//import com.att.nsa.security.authenticators.OriginalUebAuthenticator; + import com.att.nsa.security.db.BaseNsaApiDbImpl; import com.att.nsa.security.db.NsaApiDb; import com.att.nsa.security.db.NsaApiDb.KeyExistsException; @@ -70,7 +70,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; @Component public class ConfigurationReader { -// private rrNvReadable settings; + private Broker1 fMetaBroker; private ConsumerFactory fConsumerFactory; private Publisher fPublisher; @@ -78,7 +78,7 @@ public class ConfigurationReader { @Autowired private DMaaPCambriaLimiter fRateLimiter; private NsaApiDb<NsaSimpleApiKey> fApiKeyDb; - /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */ + private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager; private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager; private static CuratorFramework curator; @@ -90,7 +90,7 @@ public class ConfigurationReader { private Emailer fEmailer; private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString()); + /** * constructor to initialize all the values @@ -129,7 +129,7 @@ public class ConfigurationReader { @Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager ) throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException { - //this.settings = settings; + this.fMetrics = fMetrics; this.zk = zk; this.fConfigDb = fConfigDb; @@ -137,18 +137,18 @@ public class ConfigurationReader { ConfigurationReader.curator = curator; this.fConsumerFactory = fConsumerFactory; this.fMetaBroker = fMetaBroker; - //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs " + fMetaBroker); + this.q = q; this.mmb = mmb; this.fApiKeyDb = fApiKeyDb; - /* this.fTranDb = fTranDb; */ + this.fSecurityManager = fSecurityManager; long allowedtimeSkewMs=600000L; String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs"); if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM); - // boolean requireSecureChannel = true; + //String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel"); //if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel); //this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true)); diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java index 4c38d57..214aac8 100644 --- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java +++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java @@ -130,10 +130,10 @@ public class DMaaPResponseBuilder { */ public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException { ctx.getResponse().setStatus(200); - OutputStream os = getStreamForBinaryResponse(ctx, mediaType); - writer.write(os); - os.close(); - + try(OutputStream os = getStreamForBinaryResponse(ctx, mediaType)) { + writer.write(os); + } + } @@ -218,7 +218,7 @@ public class DMaaPResponseBuilder { /** * interface used to define write method for outputStream */ - public static abstract interface StreamWriter { + public abstract static interface StreamWriter { /** * abstract method used to write the response * @@ -252,27 +252,20 @@ public class DMaaPResponseBuilder { boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD"))); - - OutputStream os = null; - try{ + if (fResponseEntityAllowed) { - os = ctx.getResponse().getOutputStream(); - return os; + try(OutputStream os = ctx.getResponse().getOutputStream()){ + return os; + }catch (Exception e){ + log.error("Exception in getStreamForBinaryResponse",e); + throw new IOException(); + } } else { - os = new NullStream(); - return os; - } - }catch (Exception e){ - throw new IOException(); - - } - finally{ - if(null != os){ - try{ - os.close(); - }catch(Exception e) { - - } + try(OutputStream os = new NullStream()){ + return os; + }catch (Exception e){ + log.error("Exception in getStreamForBinaryResponse",e); + throw new IOException(); } } } diff --git a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java index 58c9fc9..000869e 100644 --- a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java +++ b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java @@ -39,9 +39,9 @@ public class PropertyReader extends nvReadableStack { * initializing logger * */ - //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class); -// private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties"; + /** * constructor initialization @@ -50,11 +50,11 @@ public class PropertyReader extends nvReadableStack { * */ public PropertyReader() throws loadException { - /* Map<String, String> argMap = new HashMap<String, String>(); - final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE); - final URL settingStream = findStream(config, ConfigurationReader.class); - push(new nvPropertiesFile(settingStream)); - push(new nvReadableTable(argMap));*/ + + + + + } /** @@ -83,43 +83,43 @@ public class PropertyReader extends nvReadableStack { * @exception MalformedURLException * */ - /*public static URL findStream(final String resourceName, Class<?> clazz) { - try { - File file = new File(resourceName); + + + - if (file.isAbsolute()) { - return file.toURI().toURL(); - } + + + - String filesRoot = System.getProperty("RRWT_FILES", null); + - if (null != filesRoot) { + - String fullPath = filesRoot + "/" + resourceName; + - LOGGER.debug("Looking for [" + fullPath + "]."); + - file = new File(fullPath); - if (file.exists()) { - return file.toURI().toURL(); - } - } + + + + + - URL res = clazz.getClassLoader().getResource(resourceName); + - if (null != res) { - return res; - } + + + - res = ClassLoader.getSystemResource(resourceName); + + + + + + + + + + - if (null != res) { - return res; - } - } catch (MalformedURLException e) { - LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e); - } - return null; - } -*/ } diff --git a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java index 08380fb..0e2804e 100644 --- a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java +++ b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java @@ -86,7 +86,7 @@ public class DMaaPMetricsSender implements Runnable { String Setting_CambriaTopic=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaTopic); if(Setting_CambriaTopic==null) Setting_CambriaTopic = "msgrtr.apinode.metrics.dmaap"; - // Setting_CambriaBaseUrl=Setting_CambriaBaseUrl==null?defaultTopic:Setting_CambriaBaseUrl; + String Setting_CambriaSendFreqSecs=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaSendFreqSecs); @@ -179,7 +179,7 @@ public class DMaaPMetricsSender implements Runnable { private final CambriaPublisher fCambria; private final String fHostname; - //private static final Logger log = LoggerFactory.getLogger(MetricsSender.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(MetricsSender.class); /** diff --git a/src/main/java/com/att/mr/filter/ContentLengthFilter.java b/src/main/java/com/att/mr/filter/ContentLengthFilter.java index b99f9e6..26f58e0 100644 --- a/src/main/java/com/att/mr/filter/ContentLengthFilter.java +++ b/src/main/java/com/att/mr/filter/ContentLengthFilter.java @@ -52,7 +52,7 @@ public class ContentLengthFilter implements Filter { private FilterConfig filterConfig = null; DMaaPErrorMessages errorMessages = null; - //private Logger log = Logger.getLogger(ContentLengthFilter.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthFilter.class); /** * Default constructor. @@ -110,7 +110,7 @@ public class ContentLengthFilter implements Filter { DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds() + jsonObj.toString()); log.info(errRes.toString()); - // throw new CambriaApiException(errRes); + } } |