summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java71
1 files changed, 34 insertions, 37 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
index f0bb982..8315632 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode;
import com.att.dmf.mr.exception.ErrorResponse;
import com.att.dmf.mr.utils.Utils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
@@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
*/
@Component
public class DMaaPCambriaLimiter {
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
/**
* constructor initializes
*
@@ -62,9 +68,8 @@ public class DMaaPCambriaLimiter {
* @throws invalidSettingValue
*/
@Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
- throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+ fRateInfo = new HashMap<>();
fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
@@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter {
5000);
}
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
+
/**
* Construct a rate limiter.
*
@@ -111,7 +104,7 @@ public class DMaaPCambriaLimiter {
* @param windowLengthMins
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
- fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfo = new HashMap<>();
fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
@@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter {
}
/**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
* Tell the rate limiter about a call to a topic/group/id. If the rate is
* too high, this call delays its return and throws an exception.
*
@@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter {
log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
}
} catch (InterruptedException e) {
+ log.error("Exception "+ e);
// ignore
}
@@ -213,6 +219,8 @@ public class DMaaPCambriaLimiter {
}
private static class RateInfo {
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -244,14 +252,14 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
private static class RateInfoCheck {
+
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
/**
* constructor initialzes
*
@@ -283,21 +291,10 @@ public class DMaaPCambriaLimiter {
fCallRateSinceLastMsgSend.tick();
return fCallRateSinceLastMsgSend.getRate();
}
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
}
- private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
- private final double fMaxEmptyPollsPerMinute;
- private final double fMaxPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- private final long fSleepMs1;
- //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
@@ -310,7 +307,7 @@ public class DMaaPCambriaLimiter {
}
- private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
+ /* private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
RateInfoCheck ri = fRateInfoCheck.get(key);
if (ri == null) {
@@ -318,7 +315,7 @@ public class DMaaPCambriaLimiter {
fRateInfoCheck.put(key, ri);
}
return ri;
- }
+ } */