summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java11
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java39
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java2
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java5
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java2
-rw-r--r--src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java21
-rw-r--r--src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java6
7 files changed, 21 insertions, 65 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
index 2a9e0ab..4bdd9f3 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Qualifier;
-
+import org.springframework.util.StringUtils;
import org.onap.dmaap.dmf.mr.backends.Publisher;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.utils.Utils;
@@ -61,15 +61,10 @@ public class KafkaPublisher implements Publisher {
* @throws rrNvReadable.missingReqdSetting
*/
public KafkaPublisher(@Qualifier("propertyReader") rrNvReadable settings) throws rrNvReadable.missingReqdSetting {
- //fSettings = settings;
final Properties props = new Properties();
- /*transferSetting(fSettings, props, "metadata.broker.list", "localhost:9092");
- transferSetting(fSettings, props, "request.required.acks", "1");
- transferSetting(fSettings, props, "message.send.max.retries", "5");
- transferSetting(fSettings, props, "retry.backoff.ms", "150"); */
String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
- if(null==kafkaConnUrl){
+ if(StringUtils.isEmpty(kafkaConnUrl)){
kafkaConnUrl="localhost:9092";
}
@@ -209,7 +204,7 @@ try{
*/
private void transferSetting(Properties props, String key, String defVal) {
String kafka_prop= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
- if (null==kafka_prop) kafka_prop=defVal;
+ if (StringUtils.isEmpty(kafka_prop)) kafka_prop=defVal;
//props.put(key, settings.getString("kafka." + key, defVal));
props.put(key, kafka_prop);
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
index 2091e5f..f645c8d 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPCambriaLimiter.java
@@ -52,7 +52,6 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
@Component
public class DMaaPCambriaLimiter {
private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
private final double fMaxEmptyPollsPerMinute;
private final double fMaxPollsPerMinute;
private final int fWindowLengthMins;
@@ -70,7 +69,6 @@ public class DMaaPCambriaLimiter {
@Autowired
public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
fRateInfo = new HashMap<>();
- fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
@@ -105,7 +103,6 @@ public class DMaaPCambriaLimiter {
*/
public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
fRateInfo = new HashMap<>();
- fRateInfoCheck = new HashMap<>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
@@ -226,42 +223,6 @@ public class DMaaPCambriaLimiter {
- private static class RateInfoCheck {
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
- /**
- * constructor initialzes
- *
- * @param label
- * @param windowLengthMinutes
- */
- public RateInfoCheck(String label, int windowLengthMinutes) {
- fLabel = label;
- fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
- windowLengthMinutes, TimeUnit.MINUTES);
- }
-
- public String getLabel() {
- return fLabel;
- }
-
- /**
- * CdmRateTicker is reset
- */
- public void reset() {
- fCallRateSinceLastMsgSend.reset();
- }
-
- /**
- *
- * @return
- */
- public double onCall() {
- fCallRateSinceLastMsgSend.tick();
- return fCallRateSinceLastMsgSend.getRate();
- }
- }
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 0a909ff..63e065f 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -221,7 +221,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
try {
ipLock.release();
} catch (Exception e) {
- throw new UnavailableException("Error while releasing consumer factory lock" + e, e);
+ log.error("Error while releasing consumer factory lock", e);
}
}
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 1e20ee2..03a1bd5 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -39,7 +39,7 @@ import org.apache.kafka.common.KafkaFuture;
import org.json.JSONObject;
import org.json.JSONArray;
import org.springframework.beans.factory.annotation.Qualifier;
-
+import org.springframework.util.StringUtils;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.metabroker.Broker1;
@@ -53,6 +53,7 @@ import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.drumlin.till.data.stringUtils;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
import com.att.nsa.security.NsaAcl;
import com.att.nsa.security.NsaAclUtils;
@@ -75,7 +76,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
+ if (StringUtils.isEmpty(fkafkaBrokers)) {
fkafkaBrokers = "localhost:9092";
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java
index 82ff80a..0ab80c4 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/MMServiceImpl.java
@@ -435,7 +435,7 @@ public class MMServiceImpl implements MMService {
// start processing, building a batch to push to the backend
final long startMs = System.currentTimeMillis();
long count = 0;
- long maxEventBatch = 1024 * 16;
+ long maxEventBatch = 1024L * 16L;
String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
if (null != evenlen)
maxEventBatch = Long.parseLong(evenlen);
diff --git a/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java b/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java
index a175b16..dbf4246 100644
--- a/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java
+++ b/src/main/java/org/onap/dmaap/mr/filter/ContentLengthFilter.java
@@ -74,8 +74,6 @@ public class ContentLengthFilter implements Filter {
*/
public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException,
ServletException {
- // TODO Auto-generated method stub
- // place your code here
log.info("inside servlet do filter content length checking before pub/sub");
HttpServletRequest request = (HttpServletRequest) req;
JSONObject jsonObj = null;
@@ -105,12 +103,14 @@ public class ContentLengthFilter implements Filter {
chain.doFilter(req, res);
}
} catch (CambriaApiException | NumberFormatException e) {
- log.error("message size is greater then default");
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
- DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), errorMessages.getMsgSizeExceeds()
- + jsonObj.toString());
- log.info(errRes.toString());
-
+ log.error("message size is greater then default", e);
+ if (jsonObj != null) {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
+ DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(),
+ errorMessages.getMsgSizeExceeds()
+ + jsonObj.toString());
+ log.info(errRes.toString());
+ }
}
}
@@ -119,14 +119,13 @@ public class ContentLengthFilter implements Filter {
* @see Filter#init(FilterConfig)
*/
public void init(FilterConfig fConfig) throws ServletException {
- // TODO Auto-generated method stub
this.filterConfig = fConfig;
log.info("Filter Content Length Initialize");
ApplicationContext ctx = WebApplicationContextUtils.getRequiredWebApplicationContext(fConfig
.getServletContext());
DefaultLength defLength = (DefaultLength) ctx.getBean("defLength");
- DMaaPErrorMessages errorMessages = (DMaaPErrorMessages) ctx.getBean("DMaaPErrorMessages");
- this.errorMessages = errorMessages;
+ DMaaPErrorMessages errMessages = (DMaaPErrorMessages) ctx.getBean("DMaaPErrorMessages");
+ this.errorMessages = errMessages;
this.defaultLength = defLength;
}
diff --git a/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java b/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java
index 598ef1b..3425823 100644
--- a/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java
+++ b/src/main/java/org/onap/dmaap/mr/filter/DefaultLength.java
@@ -24,14 +24,14 @@ package org.onap.dmaap.mr.filter;
public class DefaultLength {
- String defaultLength;
+ String defLength;
public String getDefaultLength() {
- return defaultLength;
+ return defLength;
}
public void setDefaultLength(String defaultLength) {
- this.defaultLength = defaultLength;
+ this.defLength = defaultLength;
}
}