diff options
author | seanfos <sean.osullivan@est.tech> | 2021-10-06 16:09:15 +0100 |
---|---|---|
committer | seanfos <sean.osullivan@est.tech> | 2021-10-11 11:08:55 +0100 |
commit | a5b9e047b91a933ab1485011b459bfeac6e857ce (patch) | |
tree | 17071362de32ff9b4855bd1ded09ec90d57e455b /src/main/java/org | |
parent | 80adb1f3525753841a7853d245dacc894417a4f7 (diff) |
[MR] Add support for configuring jaas.sasl.config at runtime
Signed-off-by: seanfos <sean.osullivan@est.tech>
Change-Id: I92a6fdb9e375db7b355e19127a5fdbe2b4d2a827
Issue-ID: DMAAP-1653
Diffstat (limited to 'src/main/java/org')
4 files changed, 249 insertions, 239 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 62dc2a5..ac40603 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -8,22 +8,23 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.backends.kafka; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,9 +44,9 @@ import java.util.Properties; /** * Sends raw JSON objects into Kafka. - * + * * Could improve space: BSON rather than JSON? - * + * * @author peter * */ @@ -53,7 +54,7 @@ import java.util.Properties; public class KafkaPublisher implements Publisher { /** * constructor initializing - * + * * @param settings * @throws rrNvReadable.missingReqdSetting */ @@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher { final Properties props = new Properties(); String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); if(StringUtils.isEmpty(kafkaConnUrl)){ - + kafkaConnUrl="localhost:9092"; } - - - if(Utils.isCadiEnabled()){ - transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); - transferSetting( props, "sasl.mechanism", "PLAIN"); - } + + + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } transferSetting( props, "bootstrap.servers",kafkaConnUrl); - + transferSetting( props, "request.required.acks", "1"); transferSetting( props, "message.send.max.retries", "5"); - transferSetting(props, "retry.backoff.ms", "150"); + transferSetting(props, "retry.backoff.ms", "150"); + + - - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - + + fProducer = new KafkaProducer<>(props); } /** * Send a message with a given topic and key. - * + * * @param msg * @throws FailedToSendMessageException * @throws JSONException @@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher { sendMessages(topic, msgs); } - /** + /** * method publishing batch messages - * This method is commented from 0.8 to 0.11 upgrade + * This method is commented from 0.8 to 0.11 upgrade * @param topic * @param kms * throws IOException * public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { - try { - fProducer.send(kms); + try { + fProducer.send(kms); - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); - } + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } } */ @@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher { fProducer.send(km); } - } catch (Exception excp) { + } catch (Exception excp) { log.error("Failed to send message(s) to topic [" + topic + "].", excp); throw new IOException(excp.getMessage(), excp); } } - + /** * Send a set of messages. Each must have a "key" string value. - * + * * @param topic * @param msg * @throws FailedToSendMessageException * @throws JSONException * + @Override + public void sendMessages(String topic, List<? extends message> msgs) + throws IOException, FailedToSendMessageException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + + final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size()); + for (message o : msgs) { + final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString()); + kms.add(data); + } + try { + fProducer.send(kms); + + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } + } */ @Override - public void sendMessages(String topic, List<? extends message> msgs) - throws IOException, FailedToSendMessageException { + public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); - - final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size()); - for (message o : msgs) { - final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString()); - kms.add(data); - } try { - fProducer.send(kms); - - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); + for (message o : msgs) { + final ProducerRecord<String, String> data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); } - } */ - @Override - public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); - try { - for (message o : msgs) { - final ProducerRecord<String, String> data = - new ProducerRecord<>(topic, o.getKey(), o.toString()); - fProducer.send(data); - } - } catch (Exception e) { - log.error("Failed to send message(s) to topic [" + topic + "].", e); - } - } - - + } + + private Producer<String, String> fProducer; - /** - * It sets the key value pair - * @param topic - * @param msg - * @param key - * @param defVal - */ + /** + * It sets the key value pair + * @param topic + * @param msg + * @param key + * @param defVal + */ private void transferSetting(Properties props, String key, String defVal) { String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; @@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher { @Override public void sendMessages(String topic, List<? extends message> msgs) throws IOException { // TODO Auto-generated method stub - + } } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 26a8cf4..6f5a17c 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.beans; @@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; @@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - + /** * constructor initialization - * + * * @param settings * @param metrics * @param curator @@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics, - @Qualifier("curator") CuratorFramework curator, - @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) + @Qualifier("curator") CuratorFramework curator, + @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException { String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, @@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.setfApiId(apiNodeId); fCache.startCache(mode, curator); if(kafkaLiveLockAvoider!=null){ - kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); - fkafkaLiveLockAvoider = kafkaLiveLockAvoider; + kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); + fkafkaLiveLockAvoider = kafkaLiveLockAvoider; } } } /* * getConsumerFor - * + * * @see * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String, * java.lang.String, java.lang.String, int, java.lang.String) This method is @@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ @Override public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { Kafka011Consumer kc; // To synchronize based on the consumer group. @@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - + if (fCache != null) { fCache.signalOwnership(topic, consumerGroupName, consumerId); } - + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); @@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + consumerId); log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + consumerId); - + } finally { if (locked) { try { @@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.dropAllConsumers(); } - + private KafkaConsumerCache fCache; private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider; private String fkafkaBrokers; @@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - + if (null != keyVal) { - + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } } /** - * Name CreateConsumerconfig + * Name CreateConsumerconfig * @param topic * @param groupId * @param consumerId * @return Properties - * + * * This method is to create Properties required to create kafka connection - * Group name is replaced with different format groupid--topic to address same - * groupids for multiple topics. Same groupid with multiple topics + * Group name is replaced with different format groupid--topic to address same + * groupids for multiple topics. Same groupid with multiple topics * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique */ private Properties createConsumerConfig(String topic ,String groupId, String consumerId) { final Properties props = new Properties(); //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic - //Fix for CPFMF-644 : + //Fix for CPFMF-644 : final String fakeGroupName = groupId + "--" + topic; props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); + props.putAll(Utils.addSaslProps()); } props.put("client.id", consumerId); @@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /** * putting values in hashmap like consumer timeout, zookeeper time out, etc - * + * * @param setting */ private static void populateKafkaInternalDefaultsMap() { } @@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /* * The starterIncremnt value is just to emulate calling certain consumers, * in this test app all the consumers are local - * + * */ private LiveLockAvoidance makeAvoidanceCallback(final String appId) { @@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { @SuppressWarnings("rawtypes") @Override public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { // TODO Auto-generated method stub return null; } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 9ab4c83..7a08345 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.beans; @@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException; /** * class performing all topic operations - * + * * @author anowarul.islam * */ @@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fkafkaBrokers = "localhost:9092"; } - - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); - } - - fKafkaAdminClient=AdminClient.create ( props ); - + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } + fKafkaAdminClient=AdminClient.create ( props ); + } private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); private final AdminClient fKafkaAdminClient; - - + + /** * DMaaPKafkaMetaBroker constructor initializing - * + * * @param settings * @param zk * @param configDb */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); @@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fkafkaBrokers = "localhost:9092"; } - - if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); - } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - - fKafkaAdminClient=AdminClient.create ( props ); - - - + + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + + fKafkaAdminClient=AdminClient.create ( props ); + + + } - + public DMaaPKafkaMetaBroker( rrNvReadable settings, - ZkClient zk, ConfigDb configDb,AdminClient client) { - + ZkClient zk, ConfigDb configDb,AdminClient client) { + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); - fKafkaAdminClient= client; - - - + fKafkaAdminClient= client; + + + } @Override @@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * static method get KafkaTopic object - * + * * @param db * @param base * @param topic @@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { log.info("Creating topic: " + topic); try { log.info("Check if topic [" + topic + "] exist."); @@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // create via kafka - try { - final NewTopic topicRequest = - new NewTopic(topic, partitions, (short)replicas); - final CreateTopicsResult ctr = - fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); - final KafkaFuture<Void> ctrResult = ctr.all(); - ctrResult.get(); - // underlying Kafka topic created. now setup our API info - return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); - } catch (InterruptedException e) { - log.warn("Execution of describeTopics timed out."); - throw new ConfigDbException(e); - } catch (ExecutionException e) { - log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); - throw new ConfigDbException(e.getCause()); - } - + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture<Void> ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } + } @Override @@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Deleting topic: " + topic); try { log.info("Loading zookeeper client for topic deletion."); - // topic creation. (Otherwise, the topic is only partially created + // topic creation. (Otherwise, the topic is only partially created // in ZK.) - - + + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); @@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * method Providing KafkaTopic Object associated with owner and * transactionenabled or not - * + * * @param name * @param desc * @param owner @@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * static method giving kafka topic object - * + * * @param db * @param basePath * @param name @@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { + boolean transactionEnabled) throws ConfigDbException { final JSONObject o = new JSONObject(); o.put("owner", owner); o.put("description", desc); @@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * class performing all user opearation like user is eligible to read, * write. permitting a user to write and read, - * + * * @author anowarul.islam * */ public static class KafkaTopic implements Topic { /** * constructor initializes - * + * * @param name * @param configdb * @param baseTopic @@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fOwner = o.optString("owner", ""); fDesc = o.optString("description", ""); fTransactionEnabled = o.optBoolean("txenabled", false);// default - // value is - // false + // value is + // false // if this topic has an owner, it needs both read/write ACLs. If there's no - // owner (or it's empty), null is okay -- this is for existing or implicitly - // created topics. - JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) - { - readers = kEmptyAcl; - } - fReaders = fromJson ( readers ); - - JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) - { - writers = kEmptyAcl; - } - fWriters = fromJson ( writers ); + // owner (or it's empty), null is okay -- this is for existing or implicitly + // created topics. + JSONObject readers = o.optJSONObject ( "readers" ); + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } + fReaders = fromJson ( readers ); + + JSONObject writers = o.optJSONObject ( "writers" ); + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } + fWriters = fromJson ( writers ); } - + private NsaAcl fromJson(JSONObject o) { NsaAcl acl = new NsaAcl(); if (o != null) { @@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { try { final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - + // we have to assume we have current data, or load it again. for the expected use // case, assuming we can overwrite the data is fine. final JSONObject o = new JSONObject (); @@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 { o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) ); o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) ); fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); - + log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - + } catch ( ConfigDbException | AccessDeniedException x ) { throw x; } - + } private JSONObject safeSerialize(NsaAcl acl) { @@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { private final NsaAcl fReaders; private final NsaAcl fWriters; private boolean fTransactionEnabled; - + public boolean isTransactionEnabled() { return fTransactionEnabled; } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java index 662f0f7..c420072 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java @@ -8,30 +8,30 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.utils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.onap.dmaap.dmf.mr.beans.DMaaPContext; - -import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.io.InputStream; import java.security.Principal; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; +import javax.servlet.http.HttpServletRequest; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; /** * This is an utility class for various operations for formatting @@ -46,13 +46,14 @@ public class Utils { private static final String BATCH_ID_FORMAT = "000000"; private static final String X509_ATTR = "javax.servlet.request.X509Certificate"; private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); + public static final String SASL_MECH = "sasl.mechanism"; private Utils() { super(); } /** - * Formatting the date + * Formatting the date * @param date * @return date or null */ @@ -128,55 +129,71 @@ public class Utils { */ public static long getSleepMsForRate ( double ratePerMinute ) { - if ( ratePerMinute <= 0.0 ) + if ( ratePerMinute <= 0.0 ) { return 0; } return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) ); } - public static String getRemoteAddress(DMaaPContext ctx) - { - String reqAddr = ctx.getRequest().getRemoteAddr(); - String fwdHeader = getFirstHeader("X-Forwarded-For",ctx); - return ((fwdHeader != null) ? fwdHeader : reqAddr); - } - public static String getFirstHeader(String h,DMaaPContext ctx) - { - List l = getHeader(h,ctx); - return ((l.size() > 0) ? (String)l.iterator().next() : null); - } - public static List<String> getHeader(String h,DMaaPContext ctx) - { - LinkedList list = new LinkedList(); - Enumeration e = ctx.getRequest().getHeaders(h); - while (e.hasMoreElements()) - { - list.add(e.nextElement().toString()); - } - return list; - } - - public static String getKafkaproperty(){ - InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties"); - Properties props = new Properties(); - try { - props.load(input); - } catch (IOException e) { - log.error("failed to read kafka.properties"); - } - return props.getProperty("key"); - - - } - - public static boolean isCadiEnabled(){ - boolean enableCadi=false; - if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){ - enableCadi=true; - } - - return enableCadi; - } - + public static String getRemoteAddress(DMaaPContext ctx) + { + String reqAddr = ctx.getRequest().getRemoteAddr(); + String fwdHeader = getFirstHeader("X-Forwarded-For",ctx); + return ((fwdHeader != null) ? fwdHeader : reqAddr); + } + public static String getFirstHeader(String h,DMaaPContext ctx) + { + List l = getHeader(h,ctx); + return ((l.size() > 0) ? (String)l.iterator().next() : null); + } + public static List<String> getHeader(String h,DMaaPContext ctx) + { + LinkedList list = new LinkedList(); + Enumeration e = ctx.getRequest().getHeaders(h); + while (e.hasMoreElements()) + { + list.add(e.nextElement().toString()); + } + return list; + } + + public static String getKafkaproperty(){ + InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties"); + Properties props = new Properties(); + try { + props.load(input); + } catch (IOException e) { + log.error("failed to read kafka.properties"); + } + return props.getProperty("key"); + + + } + + public static boolean isCadiEnabled(){ + boolean enableCadi=false; + if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){ + enableCadi=true; + } + + return enableCadi; + } + + public static Properties addSaslProps(){ + Properties props = new Properties(); + String saslMech = System.getenv("SASLMECH"); + if (saslMech != null && saslMech.equals("scram-sha-512")) { + props.put("sasl.jaas.config", System.getenv("JAASLOGIN")); + props.put(SASL_MECH, saslMech.toUpperCase()); + } + else { + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';"); + props.put(SASL_MECH, "PLAIN"); + } + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH)); + return props; + + } } |