diff options
Diffstat (limited to 'src/main')
34 files changed, 295 insertions, 382 deletions
diff --git a/src/main/java/com/att/dmf/mr/CambriaApiException.java b/src/main/java/com/att/dmf/mr/CambriaApiException.java index 84dd32c..cdf95ab 100644 --- a/src/main/java/com/att/dmf/mr/CambriaApiException.java +++ b/src/main/java/com/att/dmf/mr/CambriaApiException.java @@ -28,8 +28,12 @@ import com.att.nsa.apiServer.NsaAppException; public class CambriaApiException extends NsaAppException { + /* + * defined long type constant serialVersionUID + */ + private static final long serialVersionUID = 1L; - private ErrorResponse errRes; + private transient ErrorResponse errRes; /** * Implements constructor CambriaApiException * @param jsonObject @@ -66,10 +70,6 @@ public class CambriaApiException extends NsaAppException this.errRes = errRes; } - /* - * defined long type constant serialVersionUID - */ - private static final long serialVersionUID = 1L; public ErrorResponse getErrRes() { return errRes; } diff --git a/src/main/java/com/att/dmf/mr/backends/Consumer.java b/src/main/java/com/att/dmf/mr/backends/Consumer.java index 2743fc3..f4a9a80 100644 --- a/src/main/java/com/att/dmf/mr/backends/Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/Consumer.java @@ -21,7 +21,6 @@ *******************************************************************************/ package com.att.dmf.mr.backends; -import java.util.ArrayList; /** * A consumer interface. Consumers pull the next message from a given topic. diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java index 2bf2fb2..83c08ec 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -101,7 +101,7 @@ public class KafkaConsumerCache { // the server at least every 30 seconds, timing out after 2 minutes should // be okay. // FIXME: consider allowing the client to specify its expected call rate? - private static final long kDefault_MustTouchEveryMs = 1000 * 60 * 2; + private static final long kDefault_MustTouchEveryMs = 1000L*60*2; // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; @@ -298,8 +298,8 @@ public class KafkaConsumerCache { try { curator.blockUntilConnected(); } catch (InterruptedException e) { - // Ignore - log.error("error while setting curator framework :" + e.getMessage()); + log.error("error while setting curator framework :",e); + Thread.currentThread().interrupt(); } } @@ -511,7 +511,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in dropTimedOutConsumer",e); + Thread.currentThread().interrupt(); } log.info("Dropped " + key + " consumer due to timeout"); } @@ -638,9 +639,8 @@ public class KafkaConsumerCache { throws KafkaConsumerCacheException { // get a lock at <base>/<topic>::<consumerGroupId>::<consumerId> final String consumerKey = makeConsumerKey(topic, consumerGroupId, consumerId); - final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership"); - try { + try(final CdmTimer timer = new CdmTimer(fMetrics, "CacheSignalOwnership")) { final String consumerPath = fBaseZkPath + "/" + consumerKey; log.debug(fApiId + " attempting to claim ownership of consumer " + consumerKey); final CuratorFramework curator = ConfigurationReader.getCurator(); @@ -667,7 +667,8 @@ public class KafkaConsumerCache { consumerHandoverWaitMs = Integer.parseInt(strkSetting_ConsumerHandoverWaitMs); Thread.sleep(consumerHandoverWaitMs); } catch (InterruptedException e) { - // Ignore + log.error("InterruptedException in signalOwnership",e); + Thread.currentThread().interrupt(); } } diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java index 805701a..f521b41 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaLiveLockAvoider2.java @@ -35,9 +35,7 @@ import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.ComponentScan; import org.springframework.stereotype.Component; -import org.springframework.stereotype.Service; //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka") @Component @@ -57,7 +55,7 @@ public class KafkaLiveLockAvoider2 { @PostConstruct public void init() { - System.out.println("Welcome......................................................................................"); + log.info("Welcome......................................................................................"); try { if (curatorFramework.checkExists().forPath(locksPath) == null) { curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath); @@ -67,7 +65,7 @@ public class KafkaLiveLockAvoider2 { } } catch (Exception e) { - //e.printStackTrace(); + log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e); } @@ -138,7 +136,7 @@ public class KafkaLiveLockAvoider2 { protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) { String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId; - //Watcher processNodeWatcher = createWatcher(); + try { diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java index 0c34bfd..237cac8 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryConsumerFactory.java @@ -35,6 +35,9 @@ import com.att.dmf.mr.backends.ConsumerFactory; */ public class MemoryConsumerFactory implements ConsumerFactory { + + private final MemoryQueue fQueue; + /** * * Initializing constructor @@ -59,8 +62,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return new MemoryConsumer ( topic, consumerGroupId ); } - private final MemoryQueue fQueue; - /** * * Define nested inner class @@ -68,6 +69,12 @@ public class MemoryConsumerFactory implements ConsumerFactory */ private class MemoryConsumer implements Consumer { + + private final String fTopic; + private final String fConsumer; + private final long fCreateMs; + private long fLastAccessMs; + /** * * Initializing MemoryConsumer constructor @@ -93,11 +100,6 @@ public class MemoryConsumerFactory implements ConsumerFactory return fQueue.get ( fTopic, fConsumer ); } - private final String fTopic; - private final String fConsumer; - private final long fCreateMs; - private long fLastAccessMs; - @Override public boolean close() { //Nothing to close/clean up. @@ -168,7 +170,7 @@ public class MemoryConsumerFactory implements ConsumerFactory */ public Collection<? extends Consumer> getConsumers () { - return new ArrayList<MemoryConsumer> (); + return new ArrayList<> (); } @Override diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java index 8c841d4..e0c80bd 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryMetaBroker.java @@ -39,6 +39,10 @@ import com.att.nsa.security.NsaApiKey; * */ public class MemoryMetaBroker implements Broker { + + private final MemoryQueue fQueue; + private final HashMap<String, MemTopic> fTopics; + /** * * @param mq @@ -78,10 +82,16 @@ public class MemoryMetaBroker implements Broker { fQueue.removeTopic(topic); } - private final MemoryQueue fQueue; - private final HashMap<String, MemTopic> fTopics; - private static class MemTopic implements Topic { + + private final String fName; + private final String fDesc; + private final String fOwner; + private NsaAcl fReaders; + private NsaAcl fWriters; + private boolean ftransactionEnabled; + private String accessDenied = "User does not own this topic "; + /** * constructor initialization * @@ -141,7 +151,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fWriters == null) { fWriters = new NsaAcl(); @@ -152,7 +162,7 @@ public class MemoryMetaBroker implements Broker { @Override public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fWriters.remove(publisherId); } @@ -160,7 +170,7 @@ public class MemoryMetaBroker implements Broker { @Override public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } if (fReaders == null) { fReaders = new NsaAcl(); @@ -171,18 +181,11 @@ public class MemoryMetaBroker implements Broker { @Override public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException { if (!fOwner.equals(asUser.getKey())) { - throw new AccessDeniedException("User does not own this topic " + fName); + throw new AccessDeniedException(accessDenied + fName); } fReaders.remove(consumerId); } - private final String fName; - private final String fDesc; - private final String fOwner; - private NsaAcl fReaders; - private NsaAcl fWriters; - private boolean ftransactionEnabled; - @Override public boolean isTransactionEnabled() { return ftransactionEnabled; diff --git a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java index 0629972..8ab4619 100644 --- a/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java +++ b/src/main/java/com/att/dmf/mr/backends/memory/MemoryQueue.java @@ -43,7 +43,7 @@ public class MemoryQueue { * constructor storing hashMap objects in Queue and Offsets object */ public MemoryQueue() { - fQueue = new HashMap<String, LogBuffer>(); + fQueue = new HashMap<>(); fOffsets = new HashMap<String, HashMap<String, Integer>>(); } @@ -102,7 +102,7 @@ public class MemoryQueue { HashMap<String, Integer> offsetMap = fOffsets.get(consumerName); if (offsetMap == null) { - offsetMap = new HashMap<String, Integer>(); + offsetMap = new HashMap<>(); fOffsets.put(consumerName, offsetMap); } Integer offset = offsetMap.get(topic); @@ -169,7 +169,7 @@ public class MemoryQueue { public LogBuffer(int maxSize) { fBaseOffset = 0; fMaxSize = maxSize; - fList = new ArrayList<String>(); + fList = new ArrayList<>(); } /** diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index f0bb982..8cbf64f 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; @@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker; */ @Component public class DMaaPCambriaLimiter { + private final HashMap<String, RateInfo> fRateInfo; + private final HashMap<String, RateInfoCheck> fRateInfoCheck; + private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + private final long fSleepMs1; + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + /** * constructor initializes * @@ -62,9 +68,8 @@ public class DMaaPCambriaLimiter { * @throws invalidSettingValue */ @Autowired - public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) - throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap<String, RateInfo>(); + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) { + fRateInfo = new HashMap<>(); fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); @@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter { 5000); } - - /** - * static method provide the sleep time - * - * @param ratePerMinute - * @return - */ - public static long getSleepMsForRate(double ratePerMinute) { - if (ratePerMinute <= 0.0) - return 0; - return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); - } - + /** * Construct a rate limiter. * @@ -111,7 +104,7 @@ public class DMaaPCambriaLimiter { * @param windowLengthMins */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { - fRateInfo = new HashMap<String, RateInfo>(); + fRateInfo = new HashMap<>(); fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); @@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter { } /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** * Tell the rate limiter about a call to a topic/group/id. If the rate is * too high, this call delays its return and throws an exception. * @@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter { log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); } } catch (InterruptedException e) { + log.error("Exception "+ e); // ignore } @@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter { log.info(errRes.toString()); throw new CambriaApiException(errRes); } - /*if (fMaxPollsPerMinute <= 0) { - return; - } - final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); - final double ratevalue = ric.onCall(); - if (ratevalue > fMaxPollsPerMinute) { - try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute - + "."); - if (fSleepMs1 > fMaxPollsPerMinute) { - log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs - + " ms sleep, then responding in error."); - Thread.sleep(fSleepMs1); - ric.reset(); - } else { - log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); - } - } catch (InterruptedException e) { - // ignore - } - - - ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "This client is making too many requests " - + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); - - log.info(errRes.toString()); - throw new CambriaApiException(errRes); - }*/ } @@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter { } private static class RateInfo { + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } private static class RateInfoCheck { + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } - private final HashMap<String, RateInfo> fRateInfo; - private final HashMap<String, RateInfoCheck> fRateInfoCheck; - private final double fMaxEmptyPollsPerMinute; - private final double fMaxPollsPerMinute; - private final int fWindowLengthMins; - private final long fSleepMs; - private final long fSleepMs1; - //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { final String key = makeKey(topic, consumerGroup, clientId); @@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter { } - private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { - final String key = makeKey(topic, consumerGroup, clientId); - RateInfoCheck ri = fRateInfoCheck.get(key); - if (ri == null) { - ri = new RateInfoCheck(key, 1); - fRateInfoCheck.put(key, ri); - } - return ri; - } + diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index e4e09c8..f60fd53 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -185,9 +185,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - - fCache.signalOwnership(topic, consumerGroupName, consumerId); - + + if (fCache != null) { + fCache.signalOwnership(topic, consumerGroupName, consumerId); + } + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 643eae9..4bef985 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; + import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); @@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } public DMaaPKafkaMetaBroker( rrNvReadable settings, ZkClient zk, ConfigDb configDb,AdminClient client) { - //fSettings = settings; + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - // fKafkaAdminClient = null; + } @@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } catch ( InterruptedException e ) { - //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); throw new ConfigDbException ( e ); } catch ( ExecutionException e ) { - //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); throw new ConfigDbException ( e.getCause () ); } @@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); - */ + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java index 9942837..4c9532b 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java @@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe * * @param cs */ - //public DMaaPMetricsSet() { + public DMaaPMetricsSet(rrNvReadable cs) { - //fSettings = cs; + fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); super.putItem("version", fVersion); diff --git a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java index a971c3f..db691bd 100644 --- a/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java +++ b/src/main/java/com/att/dmf/mr/exception/DMaaPWebExceptionMapper.java @@ -35,7 +35,7 @@ import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; import org.apache.http.HttpStatus; -//import org.apache.log-4j.Logger; + import org.springframework.beans.factory.annotation.Autowired; import com.att.eelf.configuration.EELFLogger; @@ -51,8 +51,7 @@ import com.att.eelf.configuration.EELFManager; @Singleton public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{ - //private static final Logger LOGGER = Logger - // .getLogger(DMaaPWebExceptionMapper.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class); private ErrorResponse errRes; diff --git a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java index 6022b91..64b20e8 100644 --- a/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java +++ b/src/main/java/com/att/dmf/mr/listener/CambriaServletContextListener.java @@ -35,7 +35,7 @@ import com.att.eelf.configuration.EELFManager; public class CambriaServletContextListener implements ServletContextListener { DME2EndPointLoader loader = DME2EndPointLoader.getInstance(); -// private static Logger log = Logger.getLogger(CambriaServletContextListener.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaServletContextListener.class); diff --git a/src/main/java/com/att/dmf/mr/metabroker/Topic.java b/src/main/java/com/att/dmf/mr/metabroker/Topic.java index 422a2cc..d191070 100644 --- a/src/main/java/com/att/dmf/mr/metabroker/Topic.java +++ b/src/main/java/com/att/dmf/mr/metabroker/Topic.java @@ -39,16 +39,16 @@ public interface Topic extends ReadWriteSecuredResource * *//* public class AccessDeniedException extends Exception - { + *//** * AccessDenied Description *//* - public AccessDeniedException () { super ( "Access denied." ); } + *//** * AccessDenied Exception for the user while authenticating the user request * @param user *//* - public AccessDeniedException ( String user ) { super ( "Access denied for " + user ); } + private static final long serialVersionUID = 1L; }*/ diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java index d02438f..9158c96 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java @@ -386,12 +386,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return an identity manager */ - /* - * public static CambriaIdentityManager createIdentityManager ( - * Collection<String> hostSet, String apiKey, String apiSecret ) { final - * CambriaIdentityManager cim = new CambriaMetaClient ( hostSet ); - * cim.setApiCredentials ( apiKey, apiSecret ); return cim; } - */ + /** * Create a topic manager for working with topics. @@ -405,12 +400,7 @@ public class DMaaPCambriaClientFactory { * Your API secret * @return a topic manager */ - /* - * public static CambriaTopicManager createTopicManager ( Collection<String> - * hostSet, String apiKey, String apiSecret ) { final CambriaMetaClient tmi - * = new CambriaMetaClient ( hostSet ); tmi.setApiCredentials ( apiKey, - * apiSecret ); return tmi; } - */ + /** * Inject a consumer. Used to support unit tests. diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java index 08b2fd1..ebdf3ed 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/CambriaBaseClient.java @@ -31,7 +31,7 @@ import org.json.JSONArray; import org.json.JSONException; import com.att.dmf.mr.constants.CambriaConstants; -//import org.slf4j.Logger; + //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -52,12 +52,12 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr public CambriaBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException { - /*super ( hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, - CacheUse.NONE, 1, 1, TimeUnit.MILLISECONDS );*/ + + super(ConnectionType.HTTP, hosts, CambriaConstants.kStdCambriaServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000); - //fLog = LoggerFactory.getLogger ( this.getClass().getName () ); + fLog = EELFManager.getInstance().getLogger(this.getClass().getName()); //( this.getClass().getName () ); } @@ -85,7 +85,7 @@ public class CambriaBaseClient extends HttpClient implements com.att.dmf.mr.metr { fLog = log; - //replaceLogger ( log ); + } public EELFLogger getLog () diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java index 5b937c2..dee9e57 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java @@ -305,8 +305,8 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient // code from REST Client Starts - // final String serverCalculatedSignature = sha1HmacSigner.sign - // ("2015-09-21T11:38:19-0700", "iHAxArrj6Ve9JgmHvR077QiV"); + + Client client = ClientBuilder.newClient(); String metricTopicname = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); @@ -323,32 +323,19 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient Entity<byte[]> data = Entity.entity(baseStream.toByteArray(), "application/cambria"); Response response = target.request().post(data); - // header("X-CambriaAuth", - // "2OH46YIWa329QpEF:"+serverCalculatedSignature). - // header("X-CambriaDate", "2015-09-21T11:38:19-0700"). - // post(Entity.json(baseStream.toByteArray())); - + getLog().info("Response received :: " + response.getStatus()); getLog().info("Response received :: " + response.toString()); // code from REST Client Ends - /* - * final JSONObject result = post ( url, contentType, - * baseStream.toByteArray(), true ); final String logLine = - * "cambria reply ok (" + (Clock.now()-startMs) + " ms):" + - * result.toString (); getLog().info ( logLine ); - */ + fPending.clear(); return true; } catch (IllegalArgumentException x) { getLog().warn(x.getMessage(), x); } - /* - * catch ( HttpObjectNotFoundException x ) { getLog().warn ( - * x.getMessage(), x ); } catch ( HttpException x ) { getLog().warn ( - * x.getMessage(), x ); } - */ + catch (IOException x) { getLog().warn(x.getMessage(), x); } diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java index 98ddb50..7a67c92 100644 --- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java +++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaJsonStreamReader.java @@ -81,10 +81,7 @@ public class CambriaJsonStreamReader implements reader { final int c = fTokens.next(); - /*if (c ==','){ - fCloseCount++; - System.out.println("fCloseCount=" + fCloseCount +" fCount "+fCount); - }*/ + if (fIsList) { if (c == ']' || (fCount > 0 && c == 10)) return null; @@ -125,7 +122,7 @@ public class CambriaJsonStreamReader implements reader { * * @param o */ - //public msg(JSONObject o){} + public msg(JSONObject o) { diff --git a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java index 376d140..f64c0de 100644 --- a/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java +++ b/src/main/java/com/att/dmf/mr/resources/streamReaders/CambriaRawStreamReader.java @@ -137,5 +137,5 @@ public class CambriaRawStreamReader implements reader private final InputStream fStream; private final String fDefPart; private boolean fClosed; - //private String transactionId; + } diff --git a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java index b550373..ed0893d 100644 --- a/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java +++ b/src/main/java/com/att/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java @@ -46,7 +46,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { auth = true; } - //System.out.println("role " +role +" user: "+ req.getRemoteUser() +" : auth="+auth); + return auth; } @@ -57,7 +57,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { String permission = ""; String nameSpace =""; if(topicName.contains(".") && topicName.contains("com.att")) { - //String topic = topicName.substring(topicName.lastIndexOf(".")+1); + nameSpace = topicName.substring(0,topicName.lastIndexOf(".")); } else { @@ -67,12 +67,7 @@ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { if(null==nameSpace)nameSpace="com.att.dmaap.mr.ueb"; - /*ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.TOPIC_NOT_IN_AAF.getResponseCode(), "Topic does not exist in AAF" - , null, Utils.getFormattedDate(new Date()), topicName, - null, null, null, null); - - throw new CambriaApiException(errRes);*/ + } permission = nameSpace+".mr.topic|:topic."+topicName+"|"+action; diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java index e9f28ae..64dbc14 100644 --- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java +++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPMechIdAuthenticator.java @@ -25,7 +25,7 @@ import javax.servlet.http.HttpServletRequest; import com.att.dmf.mr.beans.DMaaPContext; import com.att.dmf.mr.security.DMaaPAuthenticator; -//import com.att.nsa.security.db.NsaApiDb; + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.security.NsaApiKey; @@ -65,7 +65,7 @@ public class DMaaPMechIdAuthenticator <K extends NsaApiKey> implements DMaaPAuth log.info ( "AUTH-LOG(" + remoteAddr + "): " + msg ); } -// private final NsaApiDb<K> fDb; + //private static final Logger log = Logger.getLogger( MechIdAuthenticator.class.toString()); private static final EELFLogger log = EELFManager.getInstance().getLogger(MechIdAuthenticator.class); /** diff --git a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java index a26c9e7..b1e28e7 100644 --- a/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java +++ b/src/main/java/com/att/dmf/mr/security/impl/DMaaPOriginalUebAuthenticator.java @@ -54,9 +54,9 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP public DMaaPOriginalUebAuthenticator(NsaApiDb<K> db, long requestTimeWindowMs) { fDb = db; fRequestTimeWindowMs = requestTimeWindowMs; - //fAuthenticators = new LinkedList<DMaaPAuthenticator<K>>(); + - //fAuthenticators.add(new DMaaPOriginalUebAuthenticator<K>(db, requestTimeWindowMs)); + } @@ -243,51 +243,51 @@ public class DMaaPOriginalUebAuthenticator<K extends NsaApiKey> implements DMaaP "EEEE, dd-MMM-yy HH:mm:ss zzz", }; - /*private static final String kDateFormats[] = { - // W3C date format (RFC 3339). - "yyyy-MM-dd'T'HH:mm:ssz", + + + - // Preferred HTTP date format (RFC 1123). - "EEE, dd MMM yyyy HH:mm:ss zzz", + + - // simple unix command line 'date' format - "EEE MMM dd HH:mm:ss z yyyy", + + - // Common date format (RFC 822). - "EEE, dd MMM yy HH:mm:ss z", "EEE, dd MMM yy HH:mm z", "dd MMM yy HH:mm:ss z", "dd MMM yy HH:mm z", + + - // Obsoleted HTTP date format (ANSI C asctime() format). - "EEE MMM dd HH:mm:ss yyyy", + + - // Obsoleted HTTP date format (RFC 1036). - "EEEE, dd-MMM-yy HH:mm:ss zzz", }; */ + + // logger declaration - //private static final Logger log = Logger.getLogger(DMaaPOriginalUebAuthenticator.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPOriginalUebAuthenticator.class); @Override -// public K authenticate(DMaaPContext ctx) { + // TODO Auto-generated method stub - //return null; + //} public K authenticate(DMaaPContext ctx) { - /*final HttpServletRequest req = ctx.getRequest(); - for (DMaaPAuthenticator<K> a : fAuthenticators) { - if (a.qualify(req)) { - final K k = a.isAuthentic(req); - if (k != null) - return k; - } - // else: this request doesn't look right to the authenticator - }*/ + + + + + + + + + return null; } public void addAuthenticator ( DMaaPAuthenticator<K> a ) { - //this.fAuthenticators.add(a); + } - //private final LinkedList<DMaaPAuthenticator<K>> fAuthenticators; + }
\ No newline at end of file diff --git a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java index 110970f..f7c48de 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java @@ -42,7 +42,7 @@ import com.att.nsa.configs.ConfigDbException; import com.att.nsa.limits.Blacklist; import com.att.nsa.security.NsaApiKey; import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -//import com.att.sa.highlandPark.util.HpJsonUtil; + /** * @author muzainulhaque.qazi diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java index c818f88..b0e8a86 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java @@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey; @Service public class ApiKeysServiceImpl implements ApiKeysService { - //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString()); /** * This method will provide all the ApiKeys present in kafka server. @@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous"); if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false"; - // if ((contactEmail == null) || (contactEmail.length() == 0)) + if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided ) { DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address."); @@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : " + key.toString() + "====="); apiKeyDb.saveApiKey(key); - // System.out.println("here4"); + // email out the secret to validate the email address if ( emailProvided ) { @@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { ); DMaaPResponseBuilder.respondOk(dmaapContext, o); - /*o.put("secret", "Emailed to " + contactEmail + "."); - DMaaPResponseBuilder.respondOk(dmaapContext, - o); */ + return; } else { log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.====="); diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java index e9671ce..22b60fe 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java @@ -94,7 +94,7 @@ import com.att.nsa.util.rrConvertor; @Service public class EventsServiceImpl implements EventsService { // private static final Logger LOG = - // Logger.getLogger(EventsServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); private static final String BATCH_LENGTH = "event.batch.length"; @@ -103,10 +103,10 @@ public class EventsServiceImpl implements EventsService { private DMaaPErrorMessages errorMessages; //@Autowired - //KafkaLiveLockAvoider2 kafkaLiveLockAvoider; + // @Value("${metrics.send.cambria.topic}") - // private String metricsTopic; + public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -133,7 +133,7 @@ public class EventsServiceImpl implements EventsService { CambriaApiException, IOException, DMaaPAccessDeniedException { final long startTime = System.currentTimeMillis(); final HttpServletRequest req = ctx.getRequest(); - //System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"+kafkaLiveLockAvoider); + boolean isAAFTopic = false; // was this host blacklisted? final String remoteAddr = Utils.getRemoteAddress(ctx); @@ -158,7 +158,7 @@ public class EventsServiceImpl implements EventsService { if (strtimeoutMS != null) timeoutMs = Integer.parseInt(strtimeoutMS); // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - // CambriaConstants.kNoTimeout); + if (req.getParameter("timeout") != null) { timeoutMs = Integer.parseInt(req.getParameter("timeout")); } @@ -214,7 +214,7 @@ public class EventsServiceImpl implements EventsService { // if headers are not provided then user will be null if (user == null && null != ctx.getRequest().getHeader("Authorization")) { // the topic name will be sent by the client - // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); String permission = aaf.aafPermissionString(topic, "sub"); if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { @@ -234,7 +234,7 @@ public class EventsServiceImpl implements EventsService { logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + " " + clientId); Consumer c = null; - // String localclientId = clientId; + String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "clusterhostid"); if (null == lhostId) { diff --git a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java index 83b3770..d867ea8 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java @@ -49,7 +49,7 @@ import com.att.nsa.metrics.CdmMeasuredItem; @Component public class MetricsServiceImpl implements MetricsService { - //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString()); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class); /** * diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java index 9918024..7e9d783 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java @@ -45,7 +45,7 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.metabroker.Broker.TopicExistsException; import com.att.dmf.mr.metabroker.Broker1; -//import com.att.dmf.mr.metabroker.Broker1; + import com.att.dmf.mr.metabroker.Topic; import com.att.dmf.mr.security.DMaaPAAFAuthenticator; import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl; @@ -67,13 +67,13 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; public class TopicServiceImpl implements TopicService { // private static final Logger LOGGER = - // Logger.getLogger(TopicServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); @Autowired private DMaaPErrorMessages errorMessages; // @Value("${msgRtr.topicfactory.aaf}") - // private String mrFactory; + public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -125,7 +125,7 @@ public class TopicServiceImpl implements TopicService { for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { JSONObject obj = new JSONObject(); obj.put("topicName", topic.getName()); - // obj.put("description", topic.getDescription()); + obj.put("owner", topic.getOwner()); obj.put("txenabled", topic.isTransactionEnabled()); topicsList.put(obj); @@ -193,7 +193,7 @@ public class TopicServiceImpl implements TopicService { final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); String key = null; - //String appName = dmaapContext.getRequest().getHeader("AppName"); + String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"); @@ -209,55 +209,55 @@ public class TopicServiceImpl implements TopicService { "Failed to create topic: Access Denied.User does not have permission to perform create topic"); LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); + } } - // else if (user==null && + // (null==dmaapContext.getRequest().getHeader("Authorization") && null - // == dmaapContext.getRequest().getHeader("cookie")) ) { - /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization") - ) { - LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + + + + - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + + + - LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); - }*/ + + + if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization") )*/) { - // if (user == null && + // (null!=dmaapContext.getRequest().getHeader("Authorization") || - // null != dmaapContext.getRequest().getHeader("cookie"))) { + // ACL authentication is not provided so we will use the aaf // authentication - /*LOGGER.info("Authorization the topic"); + - String permission = ""; - String nameSpace = ""; - if (topicBean.getTopicName().indexOf(".") > 1) - nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf(".")); + + + + - String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "msgRtr.topicfactory.aaf"); + + - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + - permission = mrFactoryVal + nameSpace + "|create"; - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/ + + - //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { + if (false) { LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Failed to create topic: Access Denied.User does not have permission to create topic with perm " - //+ permission); + + "permission"); @@ -267,13 +267,13 @@ public class TopicServiceImpl implements TopicService { } else { // if user is null and aaf authentication is ok then key should // be "" - // key = ""; + /** * Added as part of AAF user it should return username */ - //key = dmaapContext.getRequest().getUserPrincipal().getName().toString(); - //key="admin"; + + //LOGGER.info("key ==================== " + key); } @@ -283,7 +283,7 @@ public class TopicServiceImpl implements TopicService { final String topicName = topicBean.getTopicName(); final String desc = topicBean.getTopicDescription(); int partition = topicBean.getPartitionCount(); - // int replica = topicBean.getReplicationCount(); + if (partition == 0) { partition = 8; } @@ -291,7 +291,7 @@ public class TopicServiceImpl implements TopicService { int replica = topicBean.getReplicationCount(); if (replica == 0) { - //replica = 3; + replica = 1; } final int replicas = replica; @@ -503,25 +503,25 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // // LOGGER.info("Authenticating the user, as ACL authentication is not - // provided"); + //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - // String permission = aaf.aafPermissionString(topicName, "manage"); + + // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to permit write access to producer [" + // producerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant publish permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // } // } @@ -561,25 +561,25 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to revoke write access to producer [" + // producerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Revoke publish permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // - // } + // } Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); @@ -612,22 +612,22 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + // //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; - // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + + // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to permit read access to consumer [" + // consumerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // "+errorMessages.getNotPermitted2()+ topicName); - // LOGGER.info(errRes); + + // throw new DMaaPAccessDeniedException(errRes); // } // } @@ -662,27 +662,26 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - // if (user == null) { + //// String permission = - // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); // String permission = aaf.aafPermissionString(topicName, "manage"); // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) // { // LOGGER.error("Failed to revoke read access to consumer [" + // consumerId + "] for topic " + topicName - // + ". Authentication failed."); + // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), // errorMessages.getNotPermitted1()+" <Grant consume permissions> - // "+errorMessages.getNotPermitted2()+ topicName); + // LOGGER.info(errRes); // throw new DMaaPAccessDeniedException(errRes); // } // // - // } - + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); if (null == topic) { diff --git a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java index ae2d863..3065928 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java @@ -52,7 +52,7 @@ public class TransactionServiceImpl implements TransactionService { throws ConfigDbException, IOException { /* - * ConfigurationReader configReader = dmaapContext.getConfigReader(); + * * LOG.info("configReader : "+configReader.toString()); * @@ -77,7 +77,7 @@ public class TransactionServiceImpl implements TransactionService { IOException { /* - * if (null != transactionId) { + * * ConfigurationReader configReader = dmaapContext.getConfigReader(); * diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java index 33bc2f4..c8bb073 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -//import kafka.common.TopicExistsException; + import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONArray; import org.json.JSONObject; @@ -50,7 +50,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey; @Service public class UIServiceImpl implements UIService { - //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class); /** * Returning template of hello page diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java index dd1e4eb..fdf2d28 100644 --- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java +++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java @@ -55,7 +55,7 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import com.att.nsa.limits.Blacklist; import com.att.nsa.security.NsaAuthenticatorService; -//import com.att.nsa.security.authenticators.OriginalUebAuthenticator; + import com.att.nsa.security.db.BaseNsaApiDbImpl; import com.att.nsa.security.db.NsaApiDb; import com.att.nsa.security.db.NsaApiDb.KeyExistsException; @@ -70,7 +70,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; @Component public class ConfigurationReader { -// private rrNvReadable settings; + private Broker1 fMetaBroker; private ConsumerFactory fConsumerFactory; private Publisher fPublisher; @@ -78,7 +78,7 @@ public class ConfigurationReader { @Autowired private DMaaPCambriaLimiter fRateLimiter; private NsaApiDb<NsaSimpleApiKey> fApiKeyDb; - /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */ + private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager; private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager; private static CuratorFramework curator; @@ -90,7 +90,7 @@ public class ConfigurationReader { private Emailer fEmailer; private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString()); + /** * constructor to initialize all the values @@ -129,7 +129,7 @@ public class ConfigurationReader { @Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager ) throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException { - //this.settings = settings; + this.fMetrics = fMetrics; this.zk = zk; this.fConfigDb = fConfigDb; @@ -137,18 +137,18 @@ public class ConfigurationReader { ConfigurationReader.curator = curator; this.fConsumerFactory = fConsumerFactory; this.fMetaBroker = fMetaBroker; - //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs " + fMetaBroker); + this.q = q; this.mmb = mmb; this.fApiKeyDb = fApiKeyDb; - /* this.fTranDb = fTranDb; */ + this.fSecurityManager = fSecurityManager; long allowedtimeSkewMs=600000L; String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs"); if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM); - // boolean requireSecureChannel = true; + //String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel"); //if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel); //this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true)); diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java index 4c38d57..214aac8 100644 --- a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java +++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java @@ -130,10 +130,10 @@ public class DMaaPResponseBuilder { */ public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException { ctx.getResponse().setStatus(200); - OutputStream os = getStreamForBinaryResponse(ctx, mediaType); - writer.write(os); - os.close(); - + try(OutputStream os = getStreamForBinaryResponse(ctx, mediaType)) { + writer.write(os); + } + } @@ -218,7 +218,7 @@ public class DMaaPResponseBuilder { /** * interface used to define write method for outputStream */ - public static abstract interface StreamWriter { + public abstract static interface StreamWriter { /** * abstract method used to write the response * @@ -252,27 +252,20 @@ public class DMaaPResponseBuilder { boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD"))); - - OutputStream os = null; - try{ + if (fResponseEntityAllowed) { - os = ctx.getResponse().getOutputStream(); - return os; + try(OutputStream os = ctx.getResponse().getOutputStream()){ + return os; + }catch (Exception e){ + log.error("Exception in getStreamForBinaryResponse",e); + throw new IOException(); + } } else { - os = new NullStream(); - return os; - } - }catch (Exception e){ - throw new IOException(); - - } - finally{ - if(null != os){ - try{ - os.close(); - }catch(Exception e) { - - } + try(OutputStream os = new NullStream()){ + return os; + }catch (Exception e){ + log.error("Exception in getStreamForBinaryResponse",e); + throw new IOException(); } } } diff --git a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java index 58c9fc9..000869e 100644 --- a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java +++ b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java @@ -39,9 +39,9 @@ public class PropertyReader extends nvReadableStack { * initializing logger * */ - //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class); -// private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties"; + /** * constructor initialization @@ -50,11 +50,11 @@ public class PropertyReader extends nvReadableStack { * */ public PropertyReader() throws loadException { - /* Map<String, String> argMap = new HashMap<String, String>(); - final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE); - final URL settingStream = findStream(config, ConfigurationReader.class); - push(new nvPropertiesFile(settingStream)); - push(new nvReadableTable(argMap));*/ + + + + + } /** @@ -83,43 +83,43 @@ public class PropertyReader extends nvReadableStack { * @exception MalformedURLException * */ - /*public static URL findStream(final String resourceName, Class<?> clazz) { - try { - File file = new File(resourceName); + + + - if (file.isAbsolute()) { - return file.toURI().toURL(); - } + + + - String filesRoot = System.getProperty("RRWT_FILES", null); + - if (null != filesRoot) { + - String fullPath = filesRoot + "/" + resourceName; + - LOGGER.debug("Looking for [" + fullPath + "]."); + - file = new File(fullPath); - if (file.exists()) { - return file.toURI().toURL(); - } - } + + + + + - URL res = clazz.getClassLoader().getResource(resourceName); + - if (null != res) { - return res; - } + + + - res = ClassLoader.getSystemResource(resourceName); + + + + + + + + + + - if (null != res) { - return res; - } - } catch (MalformedURLException e) { - LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e); - } - return null; - } -*/ } diff --git a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java index 08380fb..0e2804e 100644 --- a/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java +++ b/src/main/java/com/att/mr/apiServer/metrics/cambria/DMaaPMetricsSender.java @@ -86,7 +86,7 @@ public class DMaaPMetricsSender implements Runnable { String Setting_CambriaTopic=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaTopic); if(Setting_CambriaTopic==null) Setting_CambriaTopic = "msgrtr.apinode.metrics.dmaap"; - // Setting_CambriaBaseUrl=Setting_CambriaBaseUrl==null?defaultTopic:Setting_CambriaBaseUrl; + String Setting_CambriaSendFreqSecs=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_CambriaSendFreqSecs); @@ -179,7 +179,7 @@ public class DMaaPMetricsSender implements Runnable { private final CambriaPublisher fCambria; private final String fHostname; - //private static final Logger log = LoggerFactory.getLogger(MetricsSender.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(MetricsSender.class); /** diff --git a/src/main/java/com/att/mr/filter/ContentLengthFilter.java b/src/main/java/com/att/mr/filter/ContentLengthFilter.java index b99f9e6..26f58e0 100644 --- a/src/main/java/com/att/mr/filter/ContentLengthFilter.java +++ b/src/main/java/com/att/mr/filter/ContentLengthFilter.java @@ -52,7 +52,7 @@ public class ContentLengthFilter implements Filter { private FilterConfig filterConfig = null; DMaaPErrorMessages errorMessages = null; - //private Logger log = Logger.getLogger(ContentLengthFilter.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthFilter.class); /** * Default constructor. @@ -110,7 +110,7 @@ public class ContentLengthFilter implements Filter { DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds() + jsonObj.toString()); log.info(errRes.toString()); - // throw new CambriaApiException(errRes); + } } |