diff options
-rw-r--r-- | src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java | 31 | ||||
-rw-r--r-- | src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java | 10 |
2 files changed, 15 insertions, 26 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 643eae9..4bef985 100644 --- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader; //import org.apache.log4-j.Logger; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil; + import com.att.nsa.configs.ConfigDb; import com.att.nsa.configs.ConfigDbException; import com.att.nsa.configs.ConfigPath; @@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); @@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 { props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN");*/ + fKafkaAdminClient=AdminClient.create ( props ); - // fKafkaAdminClient = null; + } public DMaaPKafkaMetaBroker( rrNvReadable settings, ZkClient zk, ConfigDb configDb,AdminClient client) { - //fSettings = settings; + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); fKafkaAdminClient= client; - // fKafkaAdminClient = null; + } @@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { } catch ( InterruptedException e ) { - //timer.fail ( "Timeout" ); + log.warn ( "Execution of describeTopics timed out." ); throw new ConfigDbException ( e ); } catch ( ExecutionException e ) { - //timer.fail ( "ExecutionError" ); + log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () ); throw new ConfigDbException ( e.getCause () ); } @@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Loading zookeeper client for topic deletion."); // topic creation. (Otherwise, the topic is only partially created // in ZK.) - /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false); - */ + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - //AdminUtils.deleteTopic(zkutils, topic); + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java index c818f88..b0e8a86 100644 --- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java +++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java @@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey; @Service public class ApiKeysServiceImpl implements ApiKeysService { - //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString()); /** * This method will provide all the ApiKeys present in kafka server. @@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous"); if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false"; - // if ((contactEmail == null) || (contactEmail.length() == 0)) + if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided ) { DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address."); @@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : " + key.toString() + "====="); apiKeyDb.saveApiKey(key); - // System.out.println("here4"); + // email out the secret to validate the email address if ( emailProvided ) { @@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService { ); DMaaPResponseBuilder.respondOk(dmaapContext, o); - /*o.put("secret", "Emailed to " + contactEmail + "."); - DMaaPResponseBuilder.respondOk(dmaapContext, - o); */ + return; } else { log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.====="); |