summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/beans
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java111
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java36
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java31
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java4
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java28
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java4
6 files changed, 79 insertions, 135 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index 5f28367..8cbf64f 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
@@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
*/
@Component
public class DMaaPCambriaLimiter {
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
/**
* constructor initializes
*
@@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter {
* @throws invalidSettingValue
*/
@Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
- throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+ fRateInfo = new HashMap<>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter {
5000);
}
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
+
/**
* Construct a rate limiter.
*
@@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter {
* @param windowLengthMins
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
- fRateInfo = new HashMap<String, RateInfo>();
- fRateInfoCheck = new HashMap<String, RateInfoCheck>();
+ fRateInfo = new HashMap<>();
+ fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
@@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter {
}
/**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
* Tell the rate limiter about a call to a topic/group/id. If the rate is
* too high, this call delays its return and throws an exception.
*
@@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter {
log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
}
} catch (InterruptedException e) {
+ log.error("Exception "+ e);
// ignore
}
@@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter {
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
- /*if (fMaxPollsPerMinute <= 0) {
- return;
- }
- final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
- final double ratevalue = ric.onCall();
- if (ratevalue > fMaxPollsPerMinute) {
- try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
- + ".");
- if (fSleepMs1 > fMaxPollsPerMinute) {
- log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
- + " ms sleep, then responding in error.");
- Thread.sleep(fSleepMs1);
- ric.reset();
- } else {
- log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
- }
- } catch (InterruptedException e) {
- // ignore
- }
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "This client is making too many requests "
- + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
-
- log.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }*/
}
@@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter {
}
private static class RateInfo {
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
private static class RateInfoCheck {
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
- private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
- private final double fMaxEmptyPollsPerMinute;
- private final double fMaxPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- private final long fSleepMs1;
- //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
@@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter {
}
- private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
- final String key = makeKey(topic, consumerGroup, clientId);
- RateInfoCheck ri = fRateInfoCheck.get(key);
- if (ri == null) {
- ri = new RateInfoCheck(key, 1);
- fRateInfoCheck.put(key, ri);
- }
- return ri;
- }
+
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 6fc0838..f60fd53 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
+
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
- // private static final Logger log = LoggerFactory
- // .getLogger(DMaaPKafkaConsumerFactory.class);
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- // @Autowired
- // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new
- // KafkaLiveLockAvoider();
+
/**
* constructor initialization
@@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
- // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- // metrics) : null;
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -189,14 +185,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
+
+ if (fCache != null) {
+ fCache.signalOwnership(topic, consumerGroupName, consumerId);
+ }
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj()
- // );
+ kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
if (fCache != null) {
@@ -265,10 +262,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
+
if (null != keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
@@ -294,10 +290,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- /*props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
+
props.put("client.id", consumerId);
// additional settings: start with our defaults, then pull in configured
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 643eae9..4bef985 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
public DMaaPKafkaMetaBroker( rrNvReadable settings,
ZkClient zk, ConfigDb configDb,AdminClient client) {
- //fSettings = settings;
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
- // fKafkaAdminClient = null;
+
}
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
catch ( InterruptedException e )
{
- //timer.fail ( "Timeout" );
+
log.warn ( "Execution of describeTopics timed out." );
throw new ConfigDbException ( e );
}
catch ( ExecutionException e )
{
- //timer.fail ( "ExecutionError" );
+
log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
throw new ConfigDbException ( e.getCause () );
}
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
- ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
- */
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- //AdminUtils.deleteTopic(zkutils, topic);
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
index 9942837..4c9532b 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
@@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe
*
* @param cs
*/
- //public DMaaPMetricsSet() {
+
public DMaaPMetricsSet(rrNvReadable cs) {
- //fSettings = cs;
+
fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
super.putItem("version", fVersion);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
index e29403f..963ff2d 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
@@ -23,7 +23,7 @@ package com.att.dmf.mr.beans;
import java.security.Key;
-//import org.apache.log4-j.Logger;
+
import org.springframework.beans.factory.annotation.Autowired;
import com.att.dmf.mr.constants.CambriaConstants;
@@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor;
*/
public class DMaaPNsaApiDb {
- //private rrNvReadable settings;
+
private DMaaPZkConfigDb cdb;
//private static final Logger log = Logger
- // .getLogger(DMaaPNsaApiDb.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
/**
@@ -63,7 +63,7 @@ public class DMaaPNsaApiDb {
*/
@Autowired
public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
- //this.setSettings(settings);
+
this.setCdb(cdb);
}
/**
@@ -79,16 +79,16 @@ public class DMaaPNsaApiDb {
missingReqdSetting {
// Cambria uses an encrypted api key db
- //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
+
final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
- // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
+
final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
// if neither value was provided, don't encrypt api key db
if (keyBase64 == null && initVectorBase64 == null) {
log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
- return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new BaseNsaApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory());
} else if (keyBase64 == null) {
// neither or both, otherwise something's goofed
@@ -100,7 +100,7 @@ public class DMaaPNsaApiDb {
log.info("This server is configured to use an encrypted API key database.");
final Key key = EncryptingLayer.readSecretKey(keyBase64);
final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
- return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
+ return new EncryptingApiDbImpl<>(cdb,
new NsaSimpleApiKeyFactory(), key, iv);
}
}
@@ -109,17 +109,17 @@ public class DMaaPNsaApiDb {
* @return
* returns settings
*/
-/* public rrNvReadable getSettings() {
- return settings;
- }*/
+
+
+
/**
* @param settings
* set settings
*/
- /*public void setSettings(rrNvReadable settings) {
- this.settings = settings;
- }*/
+
+
+
/**
* @return
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
index d543721..5aa25fa 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import com.att.dmf.mr.utils.ConfigurationReader;
import com.att.nsa.configs.confimpl.ZkConfigDb;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
+
/**
* Provide the zookeeper config db connection
* @author nilanjana.maity
@@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb {
public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
@Qualifier("propertyReader") rrNvReadable settings) {
- //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
+
super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
}