summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java247
1 files changed, 247 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
new file mode 100644
index 0000000..dd479b9
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -0,0 +1,247 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.dmf.mr.beans;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.metrics.impl.CdmRateTicker;
+import org.onap.dmaap.dmf.mr.CambriaApiException;
+import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
+import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
+import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
+import org.onap.dmaap.dmf.mr.utils.Utils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * class provide rate information
+ *
+ * @author anowarul.islam
+ *
+ */
+@Component
+public class DMaaPCambriaLimiter {
+ private final HashMap<String, RateInfo> fRateInfo;
+ private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
+ private final int fWindowLengthMins;
+ private final long fSleepMs;
+ private final long fSleepMs1;
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
+ /**
+ * constructor initializes
+ *
+ * @param settings
+ * @throws missingReqdSetting
+ * @throws invalidSettingValue
+ */
+ @Autowired
+ public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
+ fRateInfo = new HashMap<>();
+ fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+ CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
+ 30);
+ fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
+ CambriaConstants.kDefault_RateLimitWindowLength);
+ fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
+ CambriaConstants.kDefault_SleepMsOnRateLimit);
+ fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
+ 5000);
+
+ }
+
+ /**
+ * Construct a rate limiter.
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
+ this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
+ }
+
+ /**
+ * Construct a rate limiter
+ *
+ * @param maxEmptyPollsPerMinute
+ * Pass <= 0 to deactivate rate limiting.
+ * @param sleepMs
+ * @param windowLengthMins
+ */
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
+ fRateInfo = new HashMap<>();
+ fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
+ fWindowLengthMins = windowLengthMins;
+ fSleepMs = Math.max(0, sleepMs);
+ fSleepMs1 = Math.max(0, sleepMS1);
+ }
+
+ /**
+ * static method provide the sleep time
+ *
+ * @param ratePerMinute
+ * @return
+ */
+ public static long getSleepMsForRate(double ratePerMinute) {
+ if (ratePerMinute <= 0.0)
+ return 0;
+ return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
+ }
+
+ /**
+ * Tell the rate limiter about a call to a topic/group/id. If the rate is
+ * too high, this call delays its return and throws an exception.
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @throws CambriaApiException
+ */
+ public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
+ // do nothing if rate is configured 0 or less
+ if (fMaxEmptyPollsPerMinute <= 0) {
+ return;
+ }
+ // setup rate info for this tuple
+ final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
+ final double rate = ri.onCall();
+ log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
+ if (rate > fMaxEmptyPollsPerMinute) {
+ try {
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
+ + ".");
+ if (fSleepMs > 0) {
+ log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ + " ms sleep, then responding in error.");
+ Thread.sleep(fSleepMs);
+
+ } else {
+ log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+ }
+ } catch (InterruptedException e) {
+ log.error("Exception "+ e);
+ Thread.currentThread().interrupt();
+ }
+
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "This client is making too many requests. Please use a long poll "
+ + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+
+ }
+
+ /**
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @param sentCount
+ */
+ public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
+ // check for good replies
+ if (sentCount > 0) {
+ // that was a good send, reset the metric
+ getRateInfo(topic, consumerGroup, clientId).reset();
+ }
+ }
+
+ private static class RateInfo {
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
+ /**
+ * constructor initialzes
+ *
+ * @param label
+ * @param windowLengthMinutes
+ */
+ public RateInfo(String label, int windowLengthMinutes) {
+ fLabel = label;
+ fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+ windowLengthMinutes, TimeUnit.MINUTES);
+ }
+
+ public String getLabel() {
+ return fLabel;
+ }
+
+ /**
+ * CdmRateTicker is reset
+ */
+ public void reset() {
+ fCallRateSinceLastMsgSend.reset();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public double onCall() {
+ fCallRateSinceLastMsgSend.tick();
+ return fCallRateSinceLastMsgSend.getRate();
+ }
+ }
+
+
+
+
+
+
+
+ private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
+ final String key = makeKey(topic, consumerGroup, clientId);
+ RateInfo ri = fRateInfo.get(key);
+ if (ri == null) {
+ ri = new RateInfo(key, fWindowLengthMins);
+ fRateInfo.put(key, ri);
+ }
+ return ri;
+ }
+
+
+
+
+
+
+
+ private String makeKey(String topic, String group, String id) {
+ return topic + "::" + group + "::" + id;
+ }
+}