diff options
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java')
-rw-r--r-- | src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java | 111 |
1 files changed, 35 insertions, 76 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 5f28367..8cbf64f 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -35,9 +35,6 @@ import com.att.dmf.mr.exception.DMaaPResponseCode; import com.att.dmf.mr.exception.ErrorResponse; import com.att.dmf.mr.utils.Utils; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; @@ -54,6 +51,15 @@ import com.att.nsa.metrics.impl.CdmRateTicker; */ @Component public class DMaaPCambriaLimiter { + private final HashMap<String, RateInfo> fRateInfo; + private final HashMap<String, RateInfoCheck> fRateInfoCheck; + private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; + private final int fWindowLengthMins; + private final long fSleepMs; + private final long fSleepMs1; + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + /** * constructor initializes * @@ -62,10 +68,9 @@ public class DMaaPCambriaLimiter { * @throws invalidSettingValue */ @Autowired - public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) - throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) { + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, @@ -78,19 +83,7 @@ public class DMaaPCambriaLimiter { 5000); } - - /** - * static method provide the sleep time - * - * @param ratePerMinute - * @return - */ - public static long getSleepMsForRate(double ratePerMinute) { - if (ratePerMinute <= 0.0) - return 0; - return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); - } - + /** * Construct a rate limiter. * @@ -111,8 +104,8 @@ public class DMaaPCambriaLimiter { * @param windowLengthMins */ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { - fRateInfo = new HashMap<String, RateInfo>(); - fRateInfoCheck = new HashMap<String, RateInfoCheck>(); + fRateInfo = new HashMap<>(); + fRateInfoCheck = new HashMap<>(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; @@ -121,6 +114,18 @@ public class DMaaPCambriaLimiter { } /** + * static method provide the sleep time + * + * @param ratePerMinute + * @return + */ + public static long getSleepMsForRate(double ratePerMinute) { + if (ratePerMinute <= 0.0) + return 0; + return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); + } + + /** * Tell the rate limiter about a call to a topic/group/id. If the rate is * too high, this call delays its return and throws an exception. * @@ -151,6 +156,7 @@ public class DMaaPCambriaLimiter { log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); } } catch (InterruptedException e) { + log.error("Exception "+ e); // ignore } @@ -163,37 +169,7 @@ public class DMaaPCambriaLimiter { log.info(errRes.toString()); throw new CambriaApiException(errRes); } - /*if (fMaxPollsPerMinute <= 0) { - return; - } - final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); - final double ratevalue = ric.onCall(); - if (ratevalue > fMaxPollsPerMinute) { - try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute - + "."); - if (fSleepMs1 > fMaxPollsPerMinute) { - log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs - + " ms sleep, then responding in error."); - Thread.sleep(fSleepMs1); - ric.reset(); - } else { - log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); - } - } catch (InterruptedException e) { - // ignore - } - - - ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "This client is making too many requests " - + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); - - log.info(errRes.toString()); - throw new CambriaApiException(errRes); - }*/ } @@ -213,6 +189,8 @@ public class DMaaPCambriaLimiter { } private static class RateInfo { + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -244,14 +222,14 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } private static class RateInfoCheck { + + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; /** * constructor initialzes * @@ -283,21 +261,10 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend.tick(); return fCallRateSinceLastMsgSend.getRate(); } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; } - private final HashMap<String, RateInfo> fRateInfo; - private final HashMap<String, RateInfoCheck> fRateInfoCheck; - private final double fMaxEmptyPollsPerMinute; - private final double fMaxPollsPerMinute; - private final int fWindowLengthMins; - private final long fSleepMs; - private final long fSleepMs1; - //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { final String key = makeKey(topic, consumerGroup, clientId); @@ -310,15 +277,7 @@ public class DMaaPCambriaLimiter { } - private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { - final String key = makeKey(topic, consumerGroup, clientId); - RateInfoCheck ri = fRateInfoCheck.get(key); - if (ri == null) { - ri = new RateInfoCheck(key, 1); - fRateInfoCheck.put(key, ri); - } - return ri; - } + |