diff options
15 files changed, 62 insertions, 32 deletions
diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java index 6cd0230..9be9073 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011Consumer.java @@ -156,7 +156,7 @@ public class Kafka011Consumer implements Consumer { apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port; } catch (UnknownHostException e1) { // TODO Auto-generated catch block - e1.printStackTrace(); + log.error("unable to get the localhost address"); } try { diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java index 271b155..e066df5 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/Kafka011ConsumerUtil.java @@ -73,7 +73,6 @@ public class Kafka011ConsumerUtil { } catch (Exception e) { log.error("Failed and go to Exception block for " + fGroup + " " + e.getMessage()); - e.printStackTrace(); } } }); @@ -112,7 +111,6 @@ public class Kafka011ConsumerUtil { log.error("Error occurs for " + e); } catch (Exception e) { log.error("Failed and go to Exception block for " + group + " " + e.getMessage()); - e.printStackTrace(); } } }); diff --git a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java index c08d909..f5751f0 100644 --- a/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/com/att/dmf/mr/backends/kafka/KafkaPublisher.java @@ -35,6 +35,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import com.att.dmf.mr.backends.Publisher; import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.utils.Utils; //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; @@ -75,7 +76,7 @@ public class KafkaPublisher implements Publisher { - transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); transferSetting( props, "sasl.mechanism", "PLAIN"); transferSetting( props, "bootstrap.servers",kafkaConnUrl); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 74f4ef6..2f13be8 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -46,8 +46,7 @@ import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import com.att.dmf.mr.backends.kafka.LiveLockAvoidance; import com.att.dmf.mr.constants.CambriaConstants; import com.att.dmf.mr.utils.ConfigurationReader; - - +import com.att.dmf.mr.utils.Utils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; @@ -290,8 +289,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); - props.put("sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index c89a3b2..cbb58e4 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -47,6 +47,7 @@ import com.att.dmf.mr.metabroker.Broker; import com.att.dmf.mr.metabroker.Broker1; import com.att.dmf.mr.metabroker.Topic; import com.att.dmf.mr.utils.ConfigurationReader; +import com.att.dmf.mr.utils.Utils; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; @@ -83,7 +84,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); @@ -119,7 +120,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java index 9158c96..d7818de 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/DMaaPCambriaClientFactory.java @@ -22,6 +22,7 @@ package com.att.dmf.mr.metrics.publisher; import java.net.MalformedURLException; +import java.nio.channels.NotYetConnectedException; import java.util.Collection; import java.util.TreeSet; import java.util.UUID; @@ -244,7 +245,11 @@ public class DMaaPCambriaClientFactory { return new DMaaPCambriaConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret); } catch (MalformedURLException e) { - throw new RuntimeException(e); + + NotYetConnectedException exception=new NotYetConnectedException(); + exception.setStackTrace(e.getStackTrace()); + + throw exception ; } } diff --git a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java index dee9e57..e9b1cdb 100644 --- a/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java +++ b/src/main/java/com/att/dmf/mr/metrics/publisher/impl/DMaaPCambriaSimplerBatchPublisher.java @@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.MalformedURLException; +import java.nio.channels.NotYetConnectedException; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -123,7 +124,12 @@ public class DMaaPCambriaSimplerBatchPublisher extends CambriaBaseClient try { return new DMaaPCambriaSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress); } catch (MalformedURLException e) { - throw new RuntimeException(e); + + NotYetConnectedException exception=new NotYetConnectedException(); + exception.setStackTrace(e.getStackTrace()); + + throw exception ; + } } diff --git a/src/main/java/com/att/dmf/mr/service/UIService.java b/src/main/java/com/att/dmf/mr/service/UIService.java index c62b3ff..1155a2a 100644 --- a/src/main/java/com/att/dmf/mr/service/UIService.java +++ b/src/main/java/com/att/dmf/mr/service/UIService.java @@ -27,7 +27,9 @@ package com.att.dmf.mr.service; import java.io.IOException; import org.apache.kafka.common.errors.TopicExistsException; +import org.json.JSONException; +import com.att.dmf.mr.CambriaApiException; import com.att.dmf.mr.beans.DMaaPContext; import com.att.nsa.configs.ConfigDbException; /** @@ -62,7 +64,7 @@ public interface UIService { * @throws Exception */ void getApiKey(DMaaPContext dmaapContext, final String apiKey) - throws Exception; + throws CambriaApiException, ConfigDbException, JSONException, IOException; /** * Fetching list of all the topics and returning in a templated form for diff --git a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java index e55f510..387d8b1 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java @@ -404,7 +404,7 @@ public class MMServiceImpl implements MMService { int status = HttpStatus.SC_NOT_FOUND; String errorMsg = null; - if (excp instanceof CambriaApiException) { + if (excp.getClass().toString().contains("CambriaApiException")) { status = ((CambriaApiException) excp).getStatus(); JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); JSONObject errObject = new JSONObject(jsonTokener); @@ -496,7 +496,7 @@ public class MMServiceImpl implements MMService { int status = HttpStatus.SC_NOT_FOUND; String errorMsg = null; - if (excp instanceof CambriaApiException) { + if (excp.getClass().toString().contains("CambriaApiException")) { status = ((CambriaApiException) excp).getStatus(); JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); JSONObject errObject = new JSONObject(jsonTokener); @@ -541,7 +541,7 @@ public class MMServiceImpl implements MMService { } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; String errorMsg = null; - if (excp instanceof CambriaApiException) { + if (excp.getClass().toString().contains("CambriaApiException")) { status = ((CambriaApiException) excp).getStatus(); JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); JSONObject errObject = new JSONObject(jsonTokener); @@ -581,7 +581,7 @@ public class MMServiceImpl implements MMService { } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; String errorMsg = null; - if (excp instanceof CambriaApiException) { + if (excp.getClass().toString().contains("CambriaApiException")) { status = ((CambriaApiException) excp).getStatus(); JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); JSONObject errObject = new JSONObject(jsonTokener); diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java index 4c872ad..f6d7b21 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java @@ -314,7 +314,7 @@ public class TopicServiceImpl implements TopicService { throw new CambriaApiException(errRes); } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) { // TODO Auto-generated catch block - e.printStackTrace(); + LOGGER.error( e.getMessage()); } } diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java index b624a39..73ad83b 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java @@ -30,9 +30,11 @@ import java.util.Map.Entry; import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONArray; +import org.json.JSONException; import org.json.JSONObject; import org.springframework.stereotype.Service; +import com.att.dmf.mr.CambriaApiException; import com.att.dmf.mr.beans.DMaaPContext; import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker; import com.att.dmf.mr.metabroker.Topic; @@ -100,10 +102,13 @@ public class UIServiceImpl implements UIService { /** * @param dmaapContext * @param apiKey + * @throws ConfigDbException + * @throws IOException + * @throws JSONException * @throws Exception */ @Override - public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws Exception { + public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws CambriaApiException, ConfigDbException, JSONException, IOException { // TODO - We need to work on the templates and how data will be set in // the template LOGGER.info("Fetching detials of apikey: " + apiKey); @@ -114,7 +119,7 @@ public class UIServiceImpl implements UIService { DMaaPResponseBuilder.respondOk(dmaapContext, key.asJsonObject()); } else { LOGGER.info("Details of apikey [" + apiKey + "] not found. Returning response"); - throw new Exception("Key [" + apiKey + "] not found."); + throw new CambriaApiException(400,"Key [" + apiKey + "] not found."); } } diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java index 17de391..aebca34 100644 --- a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java +++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java @@ -112,6 +112,7 @@ public class ConfigurationReader { * @throws ServletException * @throws KafkaConsumerCacheException * @throws ConfigDbException + * @throws KeyExistsException */ @Autowired public ConfigurationReader(@Qualifier("propertyReader") rrNvReadable settings, @@ -128,7 +129,7 @@ public class ConfigurationReader { */ @Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager ) - throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException { + throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException, KeyExistsException { this.fMetrics = fMetrics; this.zk = zk; @@ -158,7 +159,7 @@ public class ConfigurationReader { } protected void servletSetup() - throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, ConfigDbException { + throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, ConfigDbException, KeyExistsException { try { fMetrics.toJson(); @@ -169,19 +170,12 @@ public class ConfigurationReader { if ( adminSecret != null && adminSecret.length () > 0 ) { - try - { final NsaApiDb<NsaSimpleApiKey> adminDb = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( new MemConfigDb(), new NsaSimpleApiKeyFactory() ); adminDb.createApiKey ( "admin", adminSecret ); fSecurityManager.addAuthenticator ( new DMaaPOriginalUebAuthenticator<NsaSimpleApiKey> ( adminDb, 10*60*1000 ) ); - } - catch ( KeyExistsException e ) - { - throw new RuntimeException ( "This key can't exist in a fresh in-memory DB!", e ); - } } // setup a backend diff --git a/src/main/java/com/att/dmf/mr/utils/Emailer.java b/src/main/java/com/att/dmf/mr/utils/Emailer.java index a940abf..1b68216 100644 --- a/src/main/java/com/att/dmf/mr/utils/Emailer.java +++ b/src/main/java/com/att/dmf/mr/utils/Emailer.java @@ -89,7 +89,6 @@ public class Emailer private static final EELFLogger log = EELFManager.getInstance().getLogger(Emailer.class); public static final String kSetting_MailAuthUser = "mailLogin"; - public static final String kSetting_MailAuthPwd = "mailPassword"; public static final String kSetting_MailFromEmail = "mailFromEmail"; public static final String kSetting_MailFromName = "mailFromName"; public static final String kSetting_SmtpServer = "mailSmtpServer"; @@ -162,14 +161,14 @@ public class Emailer makeSetting ( prop, "mail.smtp.starttls.enable", kSetting_SmtpServerSsl, true ); final String un = getSetting ( kSetting_MailAuthUser, "" ); - final String pw = getSetting ( kSetting_MailAuthPwd, "" ); + final String value=(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"mailPassword")!=null)?AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"mailPassword"):""; final Session session = Session.getInstance ( prop, new javax.mail.Authenticator() { @Override protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication ( un, pw ); + return new PasswordAuthentication ( un, value ); } } ); diff --git a/src/main/java/com/att/dmf/mr/utils/Utils.java b/src/main/java/com/att/dmf/mr/utils/Utils.java index 70691cf..c2b8b88 100644 --- a/src/main/java/com/att/dmf/mr/utils/Utils.java +++ b/src/main/java/com/att/dmf/mr/utils/Utils.java @@ -21,16 +21,22 @@ *******************************************************************************/ package com.att.dmf.mr.utils; +import java.io.IOException; +import java.io.InputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Enumeration; import java.util.LinkedList; import java.util.List; +import java.util.Properties; import javax.servlet.http.HttpServletRequest; +import com.att.dmf.mr.backends.kafka.KafkaPublisher; import com.att.dmf.mr.beans.DMaaPContext; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; /** * This is an utility class for various operations for formatting * @author nilanjana.maity @@ -41,6 +47,7 @@ public class Utils { private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS"; public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth"; private static final String BATCH_ID_FORMAT = "000000"; + private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); private Utils() { super(); @@ -142,4 +149,17 @@ public class Utils { } return list; } + + public static String getKafkaproperty(){ + InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties"); + Properties props = new Properties(); + try { + props.load(input); + } catch (IOException e) { + log.error("failed to read kafka.properties"); + } + return props.getProperty("key"); + + + } } diff --git a/src/main/resources/kafka.properties b/src/main/resources/kafka.properties new file mode 100644 index 0000000..4a0eddb --- /dev/null +++ b/src/main/resources/kafka.properties @@ -0,0 +1 @@ +key=admin_secret;
\ No newline at end of file |