summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java31
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java10
2 files changed, 15 insertions, 26 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 643eae9..4bef985 100644
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -50,7 +50,7 @@ import com.att.dmf.mr.utils.ConfigurationReader;
//import org.apache.log4-j.Logger;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-//import com.att.dmf.mr.backends.kafka.kafka011.SettingsUtil;
+
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
@@ -85,11 +85,9 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
//private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
@@ -122,23 +120,21 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- /* props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");*/
+
fKafkaAdminClient=AdminClient.create ( props );
- // fKafkaAdminClient = null;
+
}
public DMaaPKafkaMetaBroker( rrNvReadable settings,
ZkClient zk, ConfigDb configDb,AdminClient client) {
- //fSettings = settings;
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
- // fKafkaAdminClient = null;
+
}
@@ -235,13 +231,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
catch ( InterruptedException e )
{
- //timer.fail ( "Timeout" );
+
log.warn ( "Execution of describeTopics timed out." );
throw new ConfigDbException ( e );
}
catch ( ExecutionException e )
{
- //timer.fail ( "ExecutionError" );
+
log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
throw new ConfigDbException ( e.getCause () );
}
@@ -256,16 +252,11 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
- /*zkClient = new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
- ZkUtils zkutils =new ZkUtils(zkClient , new ZkConnection(strkSettings_KafkaZookeeper),false);
- */
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
- //AdminUtils.deleteTopic(zkutils, topic);
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
index c818f88..b0e8a86 100644
--- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
+++ b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
@@ -54,7 +54,7 @@ import com.att.nsa.security.db.simple.NsaSimpleApiKey;
@Service
public class ApiKeysServiceImpl implements ApiKeysService {
- //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString());
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
/**
* This method will provide all the ApiKeys present in kafka server.
@@ -139,7 +139,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
- // if ((contactEmail == null) || (contactEmail.length() == 0))
+
if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided )
{
DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
@@ -165,7 +165,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
+ key.toString() + "=====");
apiKeyDb.saveApiKey(key);
- // System.out.println("here4");
+
// email out the secret to validate the email address
if ( emailProvided )
{
@@ -196,9 +196,7 @@ public class ApiKeysServiceImpl implements ApiKeysService {
);
DMaaPResponseBuilder.respondOk(dmaapContext,
o);
- /*o.put("secret", "Emailed to " + contactEmail + ".");
- DMaaPResponseBuilder.respondOk(dmaapContext,
- o); */
+
return;
} else {
log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");