diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans')
6 files changed, 79 insertions, 135 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 5f28367..8cbf64f 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; @@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker; */ @Component public class DMaaPCambriaLimiter { + private final HashMap<String, RateInfo> fRateInfo; + private final HashMap<String, RateInfoCheck> fRateInfoCheck; + private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + private final long fSleepMs1; + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + /** * constructor initializes * @@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter { * @throws invalidSettingValue */ @Autowired - public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) - throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) { + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter { 5000); } - - /** - * static method provide the sleep time - * - * @param ratePerMinute - * @return - */ - public static long getSleepMsForRate(double ratePerMinute) { - if (ratePerMinute <= 0.0) - return 0; - return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); - } - + /** * Construct a rate limiter. * @@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter { * @param windowLengthMins */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; @@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter { } /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** * Tell the rate limiter about a call to a topic/group/id. If the rate is * too high, this call delays its return and throws an exception. * @@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter { log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); } } catch (InterruptedException e) { + log.error("Exception "+ e); // ignore } @@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter { log.info(errRes.toString()); throw new CambriaApiException(errRes); } - /*if (fMaxPollsPerMinute <= 0) { - return; - } - final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); - final double ratevalue = ric.onCall(); - if (ratevalue > fMaxPollsPerMinute) { - try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute - + "."); - if (fSleepMs1 > fMaxPollsPerMinute) { - log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs - + " ms sleep, then responding in error."); - Thread.sleep(fSleepMs1); - ric.reset(); - } else { - log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); - } - } catch (InterruptedException e) { - // ignore - } - - - ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "This client is making too many requests " - + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); - - log.info(errRes.toString()); - throw new CambriaApiException(errRes); - }*/ } @@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter { } private static class RateInfo { + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } private static class RateInfoCheck { + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } - private final HashMap<String, RateInfo> fRateInfo; - private final HashMap<String, RateInfoCheck> fRateInfoCheck; - private final double fMaxEmptyPollsPerMinute; - private final double fMaxPollsPerMinute; - private final int fWindowLengthMins; - private final long fSleepMs; - private final long fSleepMs1; - //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { final String key = makeKey(topic, consumerGroup, clientId); @@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter { } - private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { - final String key = makeKey(topic, consumerGroup, clientId); - RateInfoCheck ri = fRateInfoCheck.get(key); - if (ri == null) { - ri = new RateInfoCheck(key, 1); - fRateInfoCheck.put(key, ri); - } - return ri; - } + diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 6fc0838..f60fd53 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -46,8 +46,8 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.utils.ConfigurationReader; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; + + import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; @@ -58,12 +58,9 @@ import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - // private static final Logger log = LoggerFactory - // .getLogger(DMaaPKafkaConsumerFactory.class); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - // @Autowired - // private KafkaLiveLockAvoider kafkaLiveLockAvoider = new - // KafkaLiveLockAvoider(); + /** * constructor initialization @@ -106,8 +103,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - // fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - // metrics) : null; + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -189,14 +185,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - - fCache.signalOwnership(topic, consumerGroupName, consumerId); - + + if (fCache != null) { + fCache.signalOwnership(topic, consumerGroupName, consumerId); + } + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); - kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);// ,fCache.getkafkaLiveLockAvoiderObj() - // ); + kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider); log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs)); if (fCache != null) { @@ -265,10 +262,9 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { + if (null != keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } @@ -294,10 +290,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); - /*props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + + props.put("client.id", consumerId); // additional settings: start with our defaults, then pull in configured diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 643eae9..4bef985 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; + import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); @@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } public DMaaPKafkaMetaBroker( rrNvReadable settings, ZkClient zk, ConfigDb configDb,AdminClient client) { - //fSettings = settings; + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - // fKafkaAdminClient = null; + } @@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } catch ( InterruptedException e ) { - //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); throw new ConfigDbException ( e ); } catch ( ExecutionException e ) { - //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); throw new ConfigDbException ( e.getCause () ); } @@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); - */ + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java index 9942837..4c9532b 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java @@ -84,9 +84,9 @@ public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSe * * @param cs */ - //public DMaaPMetricsSet() { + public DMaaPMetricsSet(rrNvReadable cs) { - //fSettings = cs; + fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); super.putItem("version", fVersion); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java index e29403f..963ff2d 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java @@ -23,7 +23,7 @@ package com.att.dmf.mr.beans; import java.security.Key; -//import org.apache.log4-j.Logger; + import org.springframework.beans.factory.annotation.Autowired; import com.att.dmf.mr.constants.CambriaConstants; @@ -48,11 +48,11 @@ import com.att.nsa.util.rrConvertor; */ public class DMaaPNsaApiDb { - //private rrNvReadable settings; + private DMaaPZkConfigDb cdb; //private static final Logger log = Logger - // .getLogger(DMaaPNsaApiDb.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); /** @@ -63,7 +63,7 @@ public class DMaaPNsaApiDb { */ @Autowired public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { - //this.setSettings(settings); + this.setCdb(cdb); } /** @@ -79,16 +79,16 @@ public class DMaaPNsaApiDb { missingReqdSetting { // Cambria uses an encrypted api key db - //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); + final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); - // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); + final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); // if neither value was provided, don't encrypt api key db if (keyBase64 == null && initVectorBase64 == null) { log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); - return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb, + return new BaseNsaApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory()); } else if (keyBase64 == null) { // neither or both, otherwise something's goofed @@ -100,7 +100,7 @@ public class DMaaPNsaApiDb { log.info("This server is configured to use an encrypted API key database."); final Key key = EncryptingLayer.readSecretKey(keyBase64); final byte[] iv = rrConvertor.base64Decode(initVectorBase64); - return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb, + return new EncryptingApiDbImpl<>(cdb, new NsaSimpleApiKeyFactory(), key, iv); } } @@ -109,17 +109,17 @@ public class DMaaPNsaApiDb { * @return * returns settings */ -/* public rrNvReadable getSettings() { - return settings; - }*/ + + + /** * @param settings * set settings */ - /*public void setSettings(rrNvReadable settings) { - this.settings = settings; - }*/ + + + /** * @return diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java index d543721..5aa25fa 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java @@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import com.att.dmf.mr.utils.ConfigurationReader; import com.att.nsa.configs.confimpl.ZkConfigDb; import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import com.att.nsa.configs.confimpl.ZkConfigDb; + /** * Provide the zookeeper config db connection * @author nilanjana.maity @@ -42,7 +42,7 @@ public class DMaaPZkConfigDb extends ZkConfigDb { public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, @Qualifier("propertyReader") rrNvReadable settings) { - //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); + super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); } |