summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
authorseanfos <sean.osullivan@est.tech>2021-10-06 16:09:15 +0100
committerseanfos <sean.osullivan@est.tech>2021-10-11 11:08:55 +0100
commita5b9e047b91a933ab1485011b459bfeac6e857ce (patch)
tree17071362de32ff9b4855bd1ded09ec90d57e455b /src/main/java/org
parent80adb1f3525753841a7853d245dacc894417a4f7 (diff)
[MR] Add support for configuring jaas.sasl.config at runtime
Signed-off-by: seanfos <sean.osullivan@est.tech> Change-Id: I92a6fdb9e375db7b355e19127a5fdbe2b4d2a827 Issue-ID: DMAAP-1653
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java137
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java59
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java171
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java121
4 files changed, 249 insertions, 239 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
index 62dc2a5..ac40603 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -8,22 +8,23 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.backends.kafka;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -43,9 +44,9 @@ import java.util.Properties;
/**
* Sends raw JSON objects into Kafka.
- *
+ *
* Could improve space: BSON rather than JSON?
- *
+ *
* @author peter
*
*/
@@ -53,7 +54,7 @@ import java.util.Properties;
public class KafkaPublisher implements Publisher {
/**
* constructor initializing
- *
+ *
* @param settings
* @throws rrNvReadable.missingReqdSetting
*/
@@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher {
final Properties props = new Properties();
String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
if(StringUtils.isEmpty(kafkaConnUrl)){
-
+
kafkaConnUrl="localhost:9092";
}
-
-
- if(Utils.isCadiEnabled()){
- transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
- transferSetting( props, "sasl.mechanism", "PLAIN");
- }
+
+
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
-
+
transferSetting( props, "request.required.acks", "1");
transferSetting( props, "message.send.max.retries", "5");
- transferSetting(props, "retry.backoff.ms", "150");
+ transferSetting(props, "retry.backoff.ms", "150");
+
+
-
-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-
+
+
fProducer = new KafkaProducer<>(props);
}
/**
* Send a message with a given topic and key.
- *
+ *
* @param msg
* @throws FailedToSendMessageException
* @throws JSONException
@@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher {
sendMessages(topic, msgs);
}
- /**
+ /**
* method publishing batch messages
- * This method is commented from 0.8 to 0.11 upgrade
+ * This method is commented from 0.8 to 0.11 upgrade
* @param topic
* @param kms
* throws IOException
*
public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- try {
- fProducer.send(kms);
+ try {
+ fProducer.send(kms);
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
- }
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
} */
@@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher {
fProducer.send(km);
}
- } catch (Exception excp) {
+ } catch (Exception excp) {
log.error("Failed to send message(s) to topic [" + topic + "].", excp);
throw new IOException(excp.getMessage(), excp);
}
}
-
+
/**
* Send a set of messages. Each must have a "key" string value.
- *
+ *
* @param topic
* @param msg
* @throws FailedToSendMessageException
* @throws JSONException
*
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs)
+ throws IOException, FailedToSendMessageException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+ final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+ for (message o : msgs) {
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+ kms.add(data);
+ }
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+ } */
@Override
- public void sendMessages(String topic, List<? extends message> msgs)
- throws IOException, FailedToSendMessageException {
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
-
- final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
- for (message o : msgs) {
- final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
- kms.add(data);
- }
try {
- fProducer.send(kms);
-
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
+ for (message o : msgs) {
+ final ProducerRecord<String, String> data =
+ new ProducerRecord<>(topic, o.getKey(), o.toString());
+ fProducer.send(data);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", e);
}
- } */
- @Override
- public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
- log.info("sending " + msgs.size() + " events to [" + topic + "]");
- try {
- for (message o : msgs) {
- final ProducerRecord<String, String> data =
- new ProducerRecord<>(topic, o.getKey(), o.toString());
- fProducer.send(data);
- }
- } catch (Exception e) {
- log.error("Failed to send message(s) to topic [" + topic + "].", e);
- }
- }
-
-
+ }
+
+
private Producer<String, String> fProducer;
- /**
- * It sets the key value pair
- * @param topic
- * @param msg
- * @param key
- * @param defVal
- */
+ /**
+ * It sets the key value pair
+ * @param topic
+ * @param msg
+ * @param key
+ * @param defVal
+ */
private void transferSetting(Properties props, String key, String defVal) {
String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
@@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher {
@Override
public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 26a8cf4..6f5a17c 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
@@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
@@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-
+
/**
* constructor initialization
- *
+ *
* @param settings
* @param metrics
* @param curator
@@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator,
- @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
-
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.setfApiId(apiNodeId);
fCache.startCache(mode, curator);
if(kafkaLiveLockAvoider!=null){
- kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
- fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
}
}
}
/*
* getConsumerFor
- *
+ *
* @see
* com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
* java.lang.String, java.lang.String, int, java.lang.String) This method is
@@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
@Override
public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
Kafka011Consumer kc;
// To synchronize based on the consumer group.
@@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
+
if (fCache != null) {
fCache.signalOwnership(topic, consumerGroupName, consumerId);
}
-
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
@@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+ consumerId);
log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ consumerId);
-
+
} finally {
if (locked) {
try {
@@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.dropAllConsumers();
}
-
+
private KafkaConsumerCache fCache;
private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
private String fkafkaBrokers;
@@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
-
+
if (null != keyVal) {
-
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
}
/**
- * Name CreateConsumerconfig
+ * Name CreateConsumerconfig
* @param topic
* @param groupId
* @param consumerId
* @return Properties
- *
+ *
* This method is to create Properties required to create kafka connection
- * Group name is replaced with different format groupid--topic to address same
- * groupids for multiple topics. Same groupid with multiple topics
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
* may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
*/
private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
final Properties props = new Properties();
//fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
- //Fix for CPFMF-644 :
+ //Fix for CPFMF-644 :
final String fakeGroupName = groupId + "--" + topic;
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
+ props.putAll(Utils.addSaslProps());
}
props.put("client.id", consumerId);
@@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/**
* putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
+ *
* @param setting
*/
private static void populateKafkaInternalDefaultsMap() { }
@@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/*
* The starterIncremnt value is just to emulate calling certain consumers,
* in this test app all the consumers are local
- *
+ *
*/
private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
@@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
@SuppressWarnings("rawtypes")
@Override
public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
// TODO Auto-generated method stub
return null;
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 9ab4c83..7a08345 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
@@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException;
/**
* class performing all topic operations
- *
+ *
* @author anowarul.islam
*
*/
@@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fkafkaBrokers = "localhost:9092";
}
-
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
-
- fKafkaAdminClient=AdminClient.create ( props );
-
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ fKafkaAdminClient=AdminClient.create ( props );
+
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
private final AdminClient fKafkaAdminClient;
-
-
+
+
/**
* DMaaPKafkaMetaBroker constructor initializing
- *
+ *
* @param settings
* @param zk
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
@@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fkafkaBrokers = "localhost:9092";
}
-
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
- fKafkaAdminClient=AdminClient.create ( props );
-
-
-
+
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
+ fKafkaAdminClient=AdminClient.create ( props );
+
+
+
}
-
+
public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ ZkClient zk, ConfigDb configDb,AdminClient client) {
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
- fKafkaAdminClient= client;
-
-
-
+ fKafkaAdminClient= client;
+
+
+
}
@Override
@@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* static method get KafkaTopic object
- *
+ *
* @param db
* @param base
* @param topic
@@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
log.info("Creating topic: " + topic);
try {
log.info("Check if topic [" + topic + "] exist.");
@@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
// create via kafka
- try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
- final KafkaFuture<Void> ctrResult = ctr.all();
- ctrResult.get();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
- } catch (InterruptedException e) {
- log.warn("Execution of describeTopics timed out.");
- throw new ConfigDbException(e);
- } catch (ExecutionException e) {
- log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
- throw new ConfigDbException(e.getCause());
- }
-
+ try {
+ final NewTopic topicRequest =
+ new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr =
+ fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final KafkaFuture<Void> ctrResult = ctr.all();
+ ctrResult.get();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (InterruptedException e) {
+ log.warn("Execution of describeTopics timed out.");
+ throw new ConfigDbException(e);
+ } catch (ExecutionException e) {
+ log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+ throw new ConfigDbException(e.getCause());
+ }
+
}
@Override
@@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Deleting topic: " + topic);
try {
log.info("Loading zookeeper client for topic deletion.");
- // topic creation. (Otherwise, the topic is only partially created
+ // topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
+
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
@@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
- *
+ *
* @param name
* @param desc
* @param owner
@@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* static method giving kafka topic object
- *
+ *
* @param db
* @param basePath
* @param name
@@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
@@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* class performing all user opearation like user is eligible to read,
* write. permitting a user to write and read,
- *
+ *
* @author anowarul.islam
*
*/
public static class KafkaTopic implements Topic {
/**
* constructor initializes
- *
+ *
* @param name
* @param configdb
* @param baseTopic
@@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fOwner = o.optString("owner", "");
fDesc = o.optString("description", "");
fTransactionEnabled = o.optBoolean("txenabled", false);// default
- // value is
- // false
+ // value is
+ // false
// if this topic has an owner, it needs both read/write ACLs. If there's no
- // owner (or it's empty), null is okay -- this is for existing or implicitly
- // created topics.
- JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 )
- {
- readers = kEmptyAcl;
- }
- fReaders = fromJson ( readers );
-
- JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 )
- {
- writers = kEmptyAcl;
- }
- fWriters = fromJson ( writers );
+ // owner (or it's empty), null is okay -- this is for existing or implicitly
+ // created topics.
+ JSONObject readers = o.optJSONObject ( "readers" );
+ if ( readers == null && fOwner.length () > 0 )
+ {
+ readers = kEmptyAcl;
+ }
+ fReaders = fromJson ( readers );
+
+ JSONObject writers = o.optJSONObject ( "writers" );
+ if ( writers == null && fOwner.length () > 0 )
+ {
+ writers = kEmptyAcl;
+ }
+ fWriters = fromJson ( writers );
}
-
+
private NsaAcl fromJson(JSONObject o) {
NsaAcl acl = new NsaAcl();
if (o != null) {
@@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
try
{
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
+
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
@@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-
+
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
+
}
catch ( ConfigDbException | AccessDeniedException x )
{
throw x;
}
-
+
}
private JSONObject safeSerialize(NsaAcl acl) {
@@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
private final NsaAcl fReaders;
private final NsaAcl fWriters;
private boolean fTransactionEnabled;
-
+
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
index 662f0f7..c420072 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
@@ -8,30 +8,30 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.utils;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
-
-import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
import java.security.Principal;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
/**
* This is an utility class for various operations for formatting
@@ -46,13 +46,14 @@ public class Utils {
private static final String BATCH_ID_FORMAT = "000000";
private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
+ public static final String SASL_MECH = "sasl.mechanism";
private Utils() {
super();
}
/**
- * Formatting the date
+ * Formatting the date
* @param date
* @return date or null
*/
@@ -128,55 +129,71 @@ public class Utils {
*/
public static long getSleepMsForRate ( double ratePerMinute )
{
- if ( ratePerMinute <= 0.0 )
+ if ( ratePerMinute <= 0.0 )
{
return 0;
}
return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) );
}
- public static String getRemoteAddress(DMaaPContext ctx)
- {
- String reqAddr = ctx.getRequest().getRemoteAddr();
- String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
- return ((fwdHeader != null) ? fwdHeader : reqAddr);
- }
- public static String getFirstHeader(String h,DMaaPContext ctx)
- {
- List l = getHeader(h,ctx);
- return ((l.size() > 0) ? (String)l.iterator().next() : null);
- }
- public static List<String> getHeader(String h,DMaaPContext ctx)
- {
- LinkedList list = new LinkedList();
- Enumeration e = ctx.getRequest().getHeaders(h);
- while (e.hasMoreElements())
- {
- list.add(e.nextElement().toString());
- }
- return list;
- }
-
- public static String getKafkaproperty(){
- InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
- Properties props = new Properties();
- try {
- props.load(input);
- } catch (IOException e) {
- log.error("failed to read kafka.properties");
- }
- return props.getProperty("key");
-
-
- }
-
- public static boolean isCadiEnabled(){
- boolean enableCadi=false;
- if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
- enableCadi=true;
- }
-
- return enableCadi;
- }
-
+ public static String getRemoteAddress(DMaaPContext ctx)
+ {
+ String reqAddr = ctx.getRequest().getRemoteAddr();
+ String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
+ return ((fwdHeader != null) ? fwdHeader : reqAddr);
+ }
+ public static String getFirstHeader(String h,DMaaPContext ctx)
+ {
+ List l = getHeader(h,ctx);
+ return ((l.size() > 0) ? (String)l.iterator().next() : null);
+ }
+ public static List<String> getHeader(String h,DMaaPContext ctx)
+ {
+ LinkedList list = new LinkedList();
+ Enumeration e = ctx.getRequest().getHeaders(h);
+ while (e.hasMoreElements())
+ {
+ list.add(e.nextElement().toString());
+ }
+ return list;
+ }
+
+ public static String getKafkaproperty(){
+ InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
+ Properties props = new Properties();
+ try {
+ props.load(input);
+ } catch (IOException e) {
+ log.error("failed to read kafka.properties");
+ }
+ return props.getProperty("key");
+
+
+ }
+
+ public static boolean isCadiEnabled(){
+ boolean enableCadi=false;
+ if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
+ enableCadi=true;
+ }
+
+ return enableCadi;
+ }
+
+ public static Properties addSaslProps(){
+ Properties props = new Properties();
+ String saslMech = System.getenv("SASLMECH");
+ if (saslMech != null && saslMech.equals("scram-sha-512")) {
+ props.put("sasl.jaas.config", System.getenv("JAASLOGIN"));
+ props.put(SASL_MECH, saslMech.toUpperCase());
+ }
+ else {
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';");
+ props.put(SASL_MECH, "PLAIN");
+ }
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH));
+ return props;
+
+ }
}