summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java2
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java2
-rw-r--r--src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java3
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java6
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java5
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java7
-rw-r--r--src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java8
-rw-r--r--src/main/java/com/att/dmf/mr/service/UIService.java4
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java8
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java2
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java9
-rw-r--r--src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java12
-rw-r--r--src/main/java/com/att/dmf/mr/utils/Emailer.java5
-rw-r--r--src/main/java/com/att/dmf/mr/utils/Utils.java20
-rw-r--r--src/main/resources/kafka.properties1
15 files changed, 62 insertions, 32 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
index 6cd0230..9be9073 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -156,7 +156,7 @@ public class Kafka011Consumer implements Consumer {
apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
} catch (UnknownHostException e1) {
// TODO Auto-generated catch block
- e1.printStackTrace();
+ log.error("unable to get the localhost address");
}
try {
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
index 271b155..e066df5 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java
@@ -73,7 +73,6 @@ public class Kafka011ConsumerUtil {
} catch (Exception e) {
log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage());
- e.printStackTrace();
}
}
});
@@ -112,7 +111,6 @@ public class Kafka011ConsumerUtil {
log.error("Error occurs for " + e);
} catch (Exception e) {
log.error("Failed and go to Exception block for " + group + " " + e.getMessage());
- e.printStackTrace();
}
}
});
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
index c08d909..f5751f0 100644
--- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -35,6 +35,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import com.att.dmf.mr.backends.Publisher;
import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.utils.Utils;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.EELFLogger;
@@ -75,7 +76,7 @@ public class KafkaPublisher implements Publisher {
- transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
transferSetting( props, "sasl.mechanism", "PLAIN");
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 74f4ef6..2f13be8 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -46,8 +46,7 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
import com.att.dmf.mr.constants.CambriaConstants;
import com.att.dmf.mr.utils.ConfigurationReader;
-
-
+import com.att.dmf.mr.utils.Utils;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
@@ -290,8 +289,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
- props.put("sasl.jaas.config",
- "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index c89a3b2..cbb58e4 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -47,6 +47,7 @@ import com.att.dmf.mr.metabroker.Broker;
import com.att.dmf.mr.metabroker.Broker1;
import com.att.dmf.mr.metabroker.Topic;
import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.dmf.mr.utils.Utils;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -83,7 +84,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
@@ -119,7 +120,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
index 9158c96..d7818de 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java
@@ -22,6 +22,7 @@
package com.att.dmf.mr.metrics.publisher;
import java.net.MalformedURLException;
+import java.nio.channels.NotYetConnectedException;
import java.util.Collection;
import java.util.TreeSet;
import java.util.UUID;
@@ -244,7 +245,11 @@ public class DMaaPCambriaClientFactory {
return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup,
consumerId, timeoutMs, limit, filter, apiKey, apiSecret);
} catch (MalformedURLException e) {
- throw new RuntimeException(e);
+
+ NotYetConnectedException exception=new NotYetConnectedException();
+ exception.setStackTrace(e.getStackTrace());
+
+ throw exception ;
}
}
diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
index dee9e57..e9b1cdb 100644
--- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
+++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
+import java.nio.channels.NotYetConnectedException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -123,7 +124,12 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient
try {
return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress);
} catch (MalformedURLException e) {
- throw new RuntimeException(e);
+
+ NotYetConnectedException exception=new NotYetConnectedException();
+ exception.setStackTrace(e.getStackTrace());
+
+ throw exception ;
+
}
}
diff --git a/src/main/java/com/att/dmf/mr/service/UIService.java b/src/main/java/com/att/dmf/mr/service/UIService.java
index c62b3ff..1155a2a 100644
--- a/src/main/java/com/att/dmf/mr/service/UIService.java
+++ b/src/main/java/com/att/dmf/mr/service/UIService.java
@@ -27,7 +27,9 @@ package com.att.dmf.mr.service;
import java.io.IOException;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.json.JSONException;
+import com.att.dmf.mr.CambriaApiException;
import com.att.dmf.mr.beans.DMaaPContext;
import com.att.nsa.configs.ConfigDbException;
/**
@@ -62,7 +64,7 @@ public interface UIService {
* @throws Exception
*/
void getApiKey(DMaaPContext dmaapContext, final String apiKey)
- throws Exception;
+ throws CambriaApiException, ConfigDbException, JSONException, IOException;
/**
* Fetching list of all the topics and returning in a templated form for
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
index e55f510..387d8b1 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
@@ -404,7 +404,7 @@ public class MMServiceImpl implements MMService {
int status = HttpStatus.SC_NOT_FOUND;
String errorMsg = null;
- if (excp instanceof CambriaApiException) {
+ if (excp.getClass().toString().contains("CambriaApiException")) {
status = ((CambriaApiException) excp).getStatus();
JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
JSONObject errObject = new JSONObject(jsonTokener);
@@ -496,7 +496,7 @@ public class MMServiceImpl implements MMService {
int status = HttpStatus.SC_NOT_FOUND;
String errorMsg = null;
- if (excp instanceof CambriaApiException) {
+ if (excp.getClass().toString().contains("CambriaApiException")) {
status = ((CambriaApiException) excp).getStatus();
JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
JSONObject errObject = new JSONObject(jsonTokener);
@@ -541,7 +541,7 @@ public class MMServiceImpl implements MMService {
} catch (Exception excp) {
int status = HttpStatus.SC_NOT_FOUND;
String errorMsg = null;
- if (excp instanceof CambriaApiException) {
+ if (excp.getClass().toString().contains("CambriaApiException")) {
status = ((CambriaApiException) excp).getStatus();
JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
JSONObject errObject = new JSONObject(jsonTokener);
@@ -581,7 +581,7 @@ public class MMServiceImpl implements MMService {
} catch (Exception excp) {
int status = HttpStatus.SC_NOT_FOUND;
String errorMsg = null;
- if (excp instanceof CambriaApiException) {
+ if (excp.getClass().toString().contains("CambriaApiException")) {
status = ((CambriaApiException) excp).getStatus();
JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
JSONObject errObject = new JSONObject(jsonTokener);
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
index 4c872ad..f6d7b21 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
@@ -314,7 +314,7 @@ public class TopicServiceImpl implements TopicService {
throw new CambriaApiException(errRes);
} catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
// TODO Auto-generated catch block
- e.printStackTrace();
+ LOGGER.error( e.getMessage());
}
}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
index b624a39..73ad83b 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
@@ -30,9 +30,11 @@ import java.util.Map.Entry;
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONArray;
+import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.stereotype.Service;
+import com.att.dmf.mr.CambriaApiException;
import com.att.dmf.mr.beans.DMaaPContext;
import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
import com.att.dmf.mr.metabroker.Topic;
@@ -100,10 +102,13 @@ public class UIServiceImpl implements UIService {
/**
* @param dmaapContext
* @param apiKey
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws JSONException
* @throws Exception
*/
@Override
- public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws Exception {
+ public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws CambriaApiException, ConfigDbException, JSONException, IOException {
// TODO - We need to work on the templates and how data will be set in
// the template
LOGGER.info("Fetching detials of apikey: " + apiKey);
@@ -114,7 +119,7 @@ public class UIServiceImpl implements UIService {
DMaaPResponseBuilder.respondOk(dmaapContext, key.asJsonObject());
} else {
LOGGER.info("Details of apikey [" + apiKey + "] not found. Returning response");
- throw new Exception("Key [" + apiKey + "] not found.");
+ throw new CambriaApiException(400,"Key [" + apiKey + "] not found.");
}
}
diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
index 17de391..aebca34 100644
--- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
+++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
@@ -112,6 +112,7 @@ public class ConfigurationReader {
* @throws ServletException
* @throws KafkaConsumerCacheException
* @throws ConfigDbException
+ * @throws KeyExistsException
*/
@Autowired
public ConfigurationReader(@Qualifier("propertyReader") rrNvReadable settings,
@@ -128,7 +129,7 @@ public class ConfigurationReader {
*/
@Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager
)
- throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException {
+ throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException, KeyExistsException {
this.fMetrics = fMetrics;
this.zk = zk;
@@ -158,7 +159,7 @@ public class ConfigurationReader {
}
protected void servletSetup()
- throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, ConfigDbException {
+ throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, ConfigDbException, KeyExistsException {
try {
fMetrics.toJson();
@@ -169,19 +170,12 @@ public class ConfigurationReader {
if ( adminSecret != null && adminSecret.length () > 0 )
{
- try
- {
final NsaApiDb<NsaSimpleApiKey> adminDb = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( new MemConfigDb(), new NsaSimpleApiKeyFactory() );
adminDb.createApiKey ( "admin", adminSecret );
fSecurityManager.addAuthenticator ( new DMaaPOriginalUebAuthenticator<NsaSimpleApiKey> ( adminDb, 10*60*1000 ) );
- }
- catch ( KeyExistsException e )
- {
- throw new RuntimeException ( "This key can't exist in a fresh in-memory DB!", e );
- }
}
// setup a backend
diff --git a/src/main/java/com/att/dmf/mr/utils/Emailer.java b/src/main/java/com/att/dmf/mr/utils/Emailer.java
index a940abf..1b68216 100644
--- a/src/main/java/com/att/dmf/mr/utils/Emailer.java
+++ b/src/main/java/com/att/dmf/mr/utils/Emailer.java
@@ -89,7 +89,6 @@ public class Emailer
private static final EELFLogger log = EELFManager.getInstance().getLogger(Emailer.class);
public static final String kSetting_MailAuthUser = "mailLogin";
- public static final String kSetting_MailAuthPwd = "mailPassword";
public static final String kSetting_MailFromEmail = "mailFromEmail";
public static final String kSetting_MailFromName = "mailFromName";
public static final String kSetting_SmtpServer = "mailSmtpServer";
@@ -162,14 +161,14 @@ public class Emailer
makeSetting ( prop, "mail.smtp.starttls.enable", kSetting_SmtpServerSsl, true );
final String un = getSetting ( kSetting_MailAuthUser, "" );
- final String pw = getSetting ( kSetting_MailAuthPwd, "" );
+ final String value=(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"mailPassword")!=null)?AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"mailPassword"):"";
final Session session = Session.getInstance ( prop,
new javax.mail.Authenticator()
{
@Override
protected PasswordAuthentication getPasswordAuthentication()
{
- return new PasswordAuthentication ( un, pw );
+ return new PasswordAuthentication ( un, value );
}
}
);
diff --git a/src/main/java/com/att/dmf/mr/utils/Utils.java b/src/main/java/com/att/dmf/mr/utils/Utils.java
index 70691cf..c2b8b88 100644
--- a/src/main/java/com/att/dmf/mr/utils/Utils.java
+++ b/src/main/java/com/att/dmf/mr/utils/Utils.java
@@ -21,16 +21,22 @@
*******************************************************************************/
package com.att.dmf.mr.utils;
+import java.io.IOException;
+import java.io.InputStream;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import javax.servlet.http.HttpServletRequest;
+import com.att.dmf.mr.backends.kafka.KafkaPublisher;
import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
/**
* This is an utility class for various operations for formatting
* @author nilanjana.maity
@@ -41,6 +47,7 @@ public class Utils {
private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth";
private static final String BATCH_ID_FORMAT = "000000";
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
private Utils() {
super();
@@ -142,4 +149,17 @@ public class Utils {
}
return list;
}
+
+ public static String getKafkaproperty(){
+ InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
+ Properties props = new Properties();
+ try {
+ props.load(input);
+ } catch (IOException e) {
+ log.error("failed to read kafka.properties");
+ }
+ return props.getProperty("key");
+
+
+ }
}
diff --git a/src/main/resources/kafka.properties b/src/main/resources/kafka.properties
new file mode 100644
index 0000000..4a0eddb
--- /dev/null
+++ b/src/main/resources/kafka.properties
@@ -0,0 +1 @@
+key=admin_secret; \ No newline at end of file