diff options
8 files changed, 379 insertions, 337 deletions
@@ -719,6 +719,12 @@ <artifactId>slf4j-api</artifactId> <version>1.7.32</version> </dependency> + <dependency> + <groupId>com.github.stefanbirkner</groupId> + <artifactId>system-rules</artifactId> + <version>1.17.2</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> <!-- Use this profile to run the AJSC locally. This profile can be successfully diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java index 62dc2a5..ac40603 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java @@ -8,22 +8,23 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.backends.kafka; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -43,9 +44,9 @@ import java.util.Properties; /** * Sends raw JSON objects into Kafka. - * + * * Could improve space: BSON rather than JSON? - * + * * @author peter * */ @@ -53,7 +54,7 @@ import java.util.Properties; public class KafkaPublisher implements Publisher { /** * constructor initializing - * + * * @param settings * @throws rrNvReadable.missingReqdSetting */ @@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher { final Properties props = new Properties(); String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list"); if(StringUtils.isEmpty(kafkaConnUrl)){ - + kafkaConnUrl="localhost:9092"; } - - - if(Utils.isCadiEnabled()){ - transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - transferSetting( props, "security.protocol", "SASL_PLAINTEXT"); - transferSetting( props, "sasl.mechanism", "PLAIN"); - } + + + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } transferSetting( props, "bootstrap.servers",kafkaConnUrl); - + transferSetting( props, "request.required.acks", "1"); transferSetting( props, "message.send.max.retries", "5"); - transferSetting(props, "retry.backoff.ms", "150"); + transferSetting(props, "retry.backoff.ms", "150"); + + - - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - + + fProducer = new KafkaProducer<>(props); } /** * Send a message with a given topic and key. - * + * * @param msg * @throws FailedToSendMessageException * @throws JSONException @@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher { sendMessages(topic, msgs); } - /** + /** * method publishing batch messages - * This method is commented from 0.8 to 0.11 upgrade + * This method is commented from 0.8 to 0.11 upgrade * @param topic * @param kms * throws IOException * public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException { - try { - fProducer.send(kms); + try { + fProducer.send(kms); - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); - } + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } } */ @@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher { fProducer.send(km); } - } catch (Exception excp) { + } catch (Exception excp) { log.error("Failed to send message(s) to topic [" + topic + "].", excp); throw new IOException(excp.getMessage(), excp); } } - + /** * Send a set of messages. Each must have a "key" string value. - * + * * @param topic * @param msg * @throws FailedToSendMessageException * @throws JSONException * + @Override + public void sendMessages(String topic, List<? extends message> msgs) + throws IOException, FailedToSendMessageException { + log.info("sending " + msgs.size() + " events to [" + topic + "]"); + + final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size()); + for (message o : msgs) { + final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString()); + kms.add(data); + } + try { + fProducer.send(kms); + + } catch (FailedToSendMessageException excp) { + log.error("Failed to send message(s) to topic [" + topic + "].", excp); + throw new FailedToSendMessageException(excp.getMessage(), excp); + } + } */ @Override - public void sendMessages(String topic, List<? extends message> msgs) - throws IOException, FailedToSendMessageException { + public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { log.info("sending " + msgs.size() + " events to [" + topic + "]"); - - final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size()); - for (message o : msgs) { - final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString()); - kms.add(data); - } try { - fProducer.send(kms); - - } catch (FailedToSendMessageException excp) { - log.error("Failed to send message(s) to topic [" + topic + "].", excp); - throw new FailedToSendMessageException(excp.getMessage(), excp); + for (message o : msgs) { + final ProducerRecord<String, String> data = + new ProducerRecord<>(topic, o.getKey(), o.toString()); + fProducer.send(data); + } + } catch (Exception e) { + log.error("Failed to send message(s) to topic [" + topic + "].", e); } - } */ - @Override - public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException { - log.info("sending " + msgs.size() + " events to [" + topic + "]"); - try { - for (message o : msgs) { - final ProducerRecord<String, String> data = - new ProducerRecord<>(topic, o.getKey(), o.toString()); - fProducer.send(data); - } - } catch (Exception e) { - log.error("Failed to send message(s) to topic [" + topic + "].", e); - } - } - - + } + + private Producer<String, String> fProducer; - /** - * It sets the key value pair - * @param topic - * @param msg - * @param key - * @param defVal - */ + /** + * It sets the key value pair + * @param topic + * @param msg + * @param key + * @param defVal + */ private void transferSetting(Properties props, String key, String defVal) { String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key); if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal; @@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher { @Override public void sendMessages(String topic, List<? extends message> msgs) throws IOException { // TODO Auto-generated method stub - + } } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java index 26a8cf4..6f5a17c 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.beans; @@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager; import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; @@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit; */ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - + private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - + /** * constructor initialization - * + * * @param settings * @param metrics * @param curator @@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics, - @Qualifier("curator") CuratorFramework curator, - @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) + @Qualifier("curator") CuratorFramework curator, + @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider) throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException { String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, @@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { final boolean isCacheEnabled = kSetting_EnableCache; - + fCache = null; if (isCacheEnabled) { fCache = KafkaConsumerCache.getInstance(); @@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.setfApiId(apiNodeId); fCache.startCache(mode, curator); if(kafkaLiveLockAvoider!=null){ - kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); - fkafkaLiveLockAvoider = kafkaLiveLockAvoider; + kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId)); + fkafkaLiveLockAvoider = kafkaLiveLockAvoider; } } } /* * getConsumerFor - * + * * @see * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String, * java.lang.String, java.lang.String, int, java.lang.String) This method is @@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { */ @Override public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { Kafka011Consumer kc; // To synchronize based on the consumer group. @@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId + "], on topic [" + topic + "]."); - + if (fCache != null) { fCache.signalOwnership(topic, consumerGroupName, consumerId); } - + final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId); long fCreateTimeMs = System.currentTimeMillis(); KafkaConsumer<String, String> cc = new KafkaConsumer<>(props); @@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { + consumerId); log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/" + consumerId); - + } finally { if (locked) { try { @@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { fCache.dropAllConsumers(); } - + private KafkaConsumerCache fCache; private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider; private String fkafkaBrokers; @@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { private void transferSettingIfProvided(Properties target, String key, String prefix) { String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix)); - + if (null != keyVal) { - + log.info("Setting [" + key + "] to " + keyVal + "."); target.put(key, keyVal); } } /** - * Name CreateConsumerconfig + * Name CreateConsumerconfig * @param topic * @param groupId * @param consumerId * @return Properties - * + * * This method is to create Properties required to create kafka connection - * Group name is replaced with different format groupid--topic to address same - * groupids for multiple topics. Same groupid with multiple topics + * Group name is replaced with different format groupid--topic to address same + * groupids for multiple topics. Same groupid with multiple topics * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique */ private Properties createConsumerConfig(String topic ,String groupId, String consumerId) { final Properties props = new Properties(); //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic - //Fix for CPFMF-644 : + //Fix for CPFMF-644 : final String fakeGroupName = groupId + "--" + topic; props.put("group.id", fakeGroupName); props.put("enable.auto.commit", "false"); // 0.11 props.put("bootstrap.servers", fkafkaBrokers); if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put("security.protocol", "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); + props.putAll(Utils.addSaslProps()); } props.put("client.id", consumerId); @@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /** * putting values in hashmap like consumer timeout, zookeeper time out, etc - * + * * @param setting */ private static void populateKafkaInternalDefaultsMap() { } @@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { /* * The starterIncremnt value is just to emulate calling certain consumers, * in this test app all the consumers are local - * + * */ private LiveLockAvoidance makeAvoidanceCallback(final String appId) { @@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory { @SuppressWarnings("rawtypes") @Override public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs, - String remotehost) throws UnavailableException, CambriaApiException { + String remotehost) throws UnavailableException, CambriaApiException { // TODO Auto-generated method stub return null; } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java index 9ab4c83..7a08345 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.beans; @@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException; /** * class performing all topic operations - * + * * @author anowarul.islam * */ @@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fkafkaBrokers = "localhost:9092"; } - - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); - } - - fKafkaAdminClient=AdminClient.create ( props ); - + + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } + fKafkaAdminClient=AdminClient.create ( props ); + } private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); private final AdminClient fKafkaAdminClient; - - + + /** * DMaaPKafkaMetaBroker constructor initializing - * + * * @param settings * @param zk * @param configDb */ public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { + @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); @@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fkafkaBrokers = "localhost:9092"; } - - if(Utils.isCadiEnabled()){ - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); - } - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); - - fKafkaAdminClient=AdminClient.create ( props ); - - - + + if(Utils.isCadiEnabled()){ + props.putAll(Utils.addSaslProps()); + } + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers ); + + fKafkaAdminClient=AdminClient.create ( props ); + + + } - + public DMaaPKafkaMetaBroker( rrNvReadable settings, - ZkClient zk, ConfigDb configDb,AdminClient client) { - + ZkClient zk, ConfigDb configDb,AdminClient client) { + fZk = zk; fCambriaConfig = configDb; fBaseTopicData = configDb.parse("/topics"); - fKafkaAdminClient= client; - - - + fKafkaAdminClient= client; + + + } @Override @@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * static method get KafkaTopic object - * + * * @param db * @param base * @param topic @@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { */ @Override public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { + boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException { log.info("Creating topic: " + topic); try { log.info("Check if topic [" + topic + "] exist."); @@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 { // create via kafka - try { - final NewTopic topicRequest = - new NewTopic(topic, partitions, (short)replicas); - final CreateTopicsResult ctr = - fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); - final KafkaFuture<Void> ctrResult = ctr.all(); - ctrResult.get(); - // underlying Kafka topic created. now setup our API info - return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); - } catch (InterruptedException e) { - log.warn("Execution of describeTopics timed out."); - throw new ConfigDbException(e); - } catch (ExecutionException e) { - log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); - throw new ConfigDbException(e.getCause()); - } - + try { + final NewTopic topicRequest = + new NewTopic(topic, partitions, (short)replicas); + final CreateTopicsResult ctr = + fKafkaAdminClient.createTopics(Arrays.asList(topicRequest)); + final KafkaFuture<Void> ctrResult = ctr.all(); + ctrResult.get(); + // underlying Kafka topic created. now setup our API info + return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); + } catch (InterruptedException e) { + log.warn("Execution of describeTopics timed out."); + throw new ConfigDbException(e); + } catch (ExecutionException e) { + log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e); + throw new ConfigDbException(e.getCause()); + } + } @Override @@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 { log.info("Deleting topic: " + topic); try { log.info("Loading zookeeper client for topic deletion."); - // topic creation. (Otherwise, the topic is only partially created + // topic creation. (Otherwise, the topic is only partially created // in ZK.) - - + + fKafkaAdminClient.deleteTopics(Arrays.asList(topic)); log.info("Zookeeper client loaded successfully. Deleting topic."); - + } catch (Exception e) { log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); throw new ConfigDbException(e); @@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * method Providing KafkaTopic Object associated with owner and * transactionenabled or not - * + * * @param name * @param desc * @param owner @@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * static method giving kafka topic object - * + * * @param db * @param basePath * @param name @@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { * @throws ConfigDbException */ public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { + boolean transactionEnabled) throws ConfigDbException { final JSONObject o = new JSONObject(); o.put("owner", owner); o.put("description", desc); @@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 { /** * class performing all user opearation like user is eligible to read, * write. permitting a user to write and read, - * + * * @author anowarul.islam * */ public static class KafkaTopic implements Topic { /** * constructor initializes - * + * * @param name * @param configdb * @param baseTopic @@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 { fOwner = o.optString("owner", ""); fDesc = o.optString("description", ""); fTransactionEnabled = o.optBoolean("txenabled", false);// default - // value is - // false + // value is + // false // if this topic has an owner, it needs both read/write ACLs. If there's no - // owner (or it's empty), null is okay -- this is for existing or implicitly - // created topics. - JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) - { - readers = kEmptyAcl; - } - fReaders = fromJson ( readers ); - - JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) - { - writers = kEmptyAcl; - } - fWriters = fromJson ( writers ); + // owner (or it's empty), null is okay -- this is for existing or implicitly + // created topics. + JSONObject readers = o.optJSONObject ( "readers" ); + if ( readers == null && fOwner.length () > 0 ) + { + readers = kEmptyAcl; + } + fReaders = fromJson ( readers ); + + JSONObject writers = o.optJSONObject ( "writers" ); + if ( writers == null && fOwner.length () > 0 ) + { + writers = kEmptyAcl; + } + fWriters = fromJson ( writers ); } - + private NsaAcl fromJson(JSONObject o) { NsaAcl acl = new NsaAcl(); if (o != null) { @@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { try { final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - + // we have to assume we have current data, or load it again. for the expected use // case, assuming we can overwrite the data is fine. final JSONObject o = new JSONObject (); @@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 { o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) ); o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) ); fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); - + log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - + } catch ( ConfigDbException | AccessDeniedException x ) { throw x; } - + } private JSONObject safeSerialize(NsaAcl acl) { @@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 { private final NsaAcl fReaders; private final NsaAcl fWriters; private boolean fTransactionEnabled; - + public boolean isTransactionEnabled() { return fTransactionEnabled; } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java index 662f0f7..c420072 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java @@ -8,30 +8,30 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.utils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; -import org.onap.dmaap.dmf.mr.beans.DMaaPContext; - -import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.io.InputStream; import java.security.Principal; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; +import javax.servlet.http.HttpServletRequest; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; /** * This is an utility class for various operations for formatting @@ -46,13 +46,14 @@ public class Utils { private static final String BATCH_ID_FORMAT = "000000"; private static final String X509_ATTR = "javax.servlet.request.X509Certificate"; private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); + public static final String SASL_MECH = "sasl.mechanism"; private Utils() { super(); } /** - * Formatting the date + * Formatting the date * @param date * @return date or null */ @@ -128,55 +129,71 @@ public class Utils { */ public static long getSleepMsForRate ( double ratePerMinute ) { - if ( ratePerMinute <= 0.0 ) + if ( ratePerMinute <= 0.0 ) { return 0; } return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) ); } - public static String getRemoteAddress(DMaaPContext ctx) - { - String reqAddr = ctx.getRequest().getRemoteAddr(); - String fwdHeader = getFirstHeader("X-Forwarded-For",ctx); - return ((fwdHeader != null) ? fwdHeader : reqAddr); - } - public static String getFirstHeader(String h,DMaaPContext ctx) - { - List l = getHeader(h,ctx); - return ((l.size() > 0) ? (String)l.iterator().next() : null); - } - public static List<String> getHeader(String h,DMaaPContext ctx) - { - LinkedList list = new LinkedList(); - Enumeration e = ctx.getRequest().getHeaders(h); - while (e.hasMoreElements()) - { - list.add(e.nextElement().toString()); - } - return list; - } - - public static String getKafkaproperty(){ - InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties"); - Properties props = new Properties(); - try { - props.load(input); - } catch (IOException e) { - log.error("failed to read kafka.properties"); - } - return props.getProperty("key"); - - - } - - public static boolean isCadiEnabled(){ - boolean enableCadi=false; - if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){ - enableCadi=true; - } - - return enableCadi; - } - + public static String getRemoteAddress(DMaaPContext ctx) + { + String reqAddr = ctx.getRequest().getRemoteAddr(); + String fwdHeader = getFirstHeader("X-Forwarded-For",ctx); + return ((fwdHeader != null) ? fwdHeader : reqAddr); + } + public static String getFirstHeader(String h,DMaaPContext ctx) + { + List l = getHeader(h,ctx); + return ((l.size() > 0) ? (String)l.iterator().next() : null); + } + public static List<String> getHeader(String h,DMaaPContext ctx) + { + LinkedList list = new LinkedList(); + Enumeration e = ctx.getRequest().getHeaders(h); + while (e.hasMoreElements()) + { + list.add(e.nextElement().toString()); + } + return list; + } + + public static String getKafkaproperty(){ + InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties"); + Properties props = new Properties(); + try { + props.load(input); + } catch (IOException e) { + log.error("failed to read kafka.properties"); + } + return props.getProperty("key"); + + + } + + public static boolean isCadiEnabled(){ + boolean enableCadi=false; + if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){ + enableCadi=true; + } + + return enableCadi; + } + + public static Properties addSaslProps(){ + Properties props = new Properties(); + String saslMech = System.getenv("SASLMECH"); + if (saslMech != null && saslMech.equals("scram-sha-512")) { + props.put("sasl.jaas.config", System.getenv("JAASLOGIN")); + props.put(SASL_MECH, saslMech.toUpperCase()); + } + else { + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';"); + props.put(SASL_MECH, "PLAIN"); + } + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH)); + return props; + + } } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java index 7a0fe78..44047e4 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java @@ -44,7 +44,7 @@ public class KafkaPublisherTest { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); PowerMockito.mockStatic(Utils.class); - PowerMockito.when(Utils.isCadiEnabled()).thenReturn(true); + PowerMockito.when(Utils.isCadiEnabled()).thenReturn(false); } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java index f49f615..9d7a931 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java @@ -9,7 +9,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,26 +18,27 @@ * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ - package org.onap.dmaap.mr.cambria.embed; +package org.onap.dmaap.mr.cambria.embed; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.security.db.BaseNsaApiDbImpl; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; +import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; import java.io.File; import java.util.Arrays; import java.util.Map; import java.util.Properties; - import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher; -import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker; -import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; +import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher; +import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker; +import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue; import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory; import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker; import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet; @@ -49,13 +50,11 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory; import org.onap.dmaap.dmf.mr.utils.PropertyReader; -import com.att.nsa.security.db.BaseNsaApiDbImpl; -import com.att.nsa.security.db.simple.NsaSimpleApiKey; -import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; +import org.onap.dmaap.dmf.mr.utils.Utils; public class EmbedConfigurationReader { - private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded"; + private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded"; public static final String TEST_TOPIC = "testTopic"; private static final int BROKER_ID = 0; private static final int BROKER_PORT = 5000; @@ -69,49 +68,49 @@ public class EmbedConfigurationReader { String dir; private AdminClient fKafkaAdminClient; KafkaLocal kafkaLocal; - - public void setUp() throws Exception { - - ClassLoader classLoader = getClass().getClassLoader(); - AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile())); - - Properties kafkaProperties; + + public void setUp() throws Exception { + + ClassLoader classLoader = getClass().getClassLoader(); + AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile())); + + Properties kafkaProperties; Properties zkProperties; try { //load properties - dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent(); + dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent(); kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID); zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR); //start kafkaLocalServer kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties); - + Map<String, String> map = AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop); map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST); map.put("kafka.client.zookeeper", ZOOKEEPER_HOST); map.put("kafka.metadata.broker.list", LOCALHOST_BROKER); - + DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader()); - + final Properties props = new Properties (); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); - props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - props.put("sasl.mechanism", "PLAIN"); - fKafkaAdminClient = AdminClient.create ( props ); - - // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC)) + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + props.put("sasl.mechanism", "PLAIN"); + fKafkaAdminClient = AdminClient.create ( props ); + + // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC)) // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties()); - final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () ); - fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); + final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () ); + fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) ); Thread.sleep(5000); } catch (Exception e){ e.printStackTrace(System.out); - } - } - - private static Properties getKafkaProperties(String logDir, int port, int brokerId) { + } + } + + private static Properties getKafkaProperties(String logDir, int port, int brokerId) { Properties properties = new Properties(); properties.put("port", port + ""); properties.put("broker.id", brokerId + ""); @@ -122,47 +121,47 @@ public class EmbedConfigurationReader { properties.put("consumer.timeout.ms", -1); return properties; } - - private static Properties getZookeeperProperties(int port, String zookeeperDir) { + + private static Properties getZookeeperProperties(int port, String zookeeperDir) { Properties properties = new Properties(); properties.put("clientPort", port + ""); properties.put("dataDir", zookeeperDir); return properties; } - public void tearDown() throws Exception { - DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader()); - if(fKafkaAdminClient!=null) - fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC)); - //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC); - //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR); - //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR); - kafkaLocal.stop(); - FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR)); - } - - - public ConfigurationReader buildConfigurationReader() throws Exception { - - setUp(); - - PropertyReader propertyReader = new PropertyReader(); - DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader); - DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader); - DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader); - CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader()); - DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null); - MemoryQueue memoryQueue = new MemoryQueue(); - MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb); - BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory()); - DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl); - KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader); - DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb); - - return new ConfigurationReader(propertyReader, - dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, - curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, - memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator); - - } + public void tearDown() throws Exception { + DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader()); + if(fKafkaAdminClient!=null) + fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC)); + //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC); + //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR); + //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR); + kafkaLocal.stop(); + FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR)); + } + + + public ConfigurationReader buildConfigurationReader() throws Exception { + + setUp(); + + PropertyReader propertyReader = new PropertyReader(); + DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader); + DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader); + DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader); + CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader()); + DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null); + MemoryQueue memoryQueue = new MemoryQueue(); + MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb); + BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory()); + DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl); + KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader); + DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb); + + return new ConfigurationReader(propertyReader, + dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher, + curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker, + memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator); + + } } diff --git a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java index 8a4009b..74f6750 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java @@ -1,5 +1,5 @@ /******************************************************************************* -/*- + /*- * ============LICENSE_START======================================================= * ONAP Policy Engine * ================================================================================ @@ -8,9 +8,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,22 +18,26 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - - package org.onap.dmaap.mr.cambria.utils; + +package org.onap.dmaap.mr.cambria.utils; import static org.junit.Assert.*; import java.security.Principal; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Properties; import org.apache.http.auth.BasicUserPrincipal; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; import org.springframework.mock.web.MockHttpServletRequest; + import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.utils.Utils; @@ -41,6 +45,9 @@ public class UtilsTest { private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS"; + @Rule + public EnvironmentVariables environmentVariables = new EnvironmentVariables(); + @Before public void setUp() throws Exception { } @@ -57,33 +64,33 @@ public class UtilsTest { String expectedStr = sdf.format(now); assertNotNull(dateStr); assertTrue("Formatted date does not match - expected [" + expectedStr - + "] received [" + dateStr + "]", + + "] received [" + dateStr + "]", dateStr.equalsIgnoreCase(expectedStr)); } - + @Test public void testgetUserApiKey(){ MockHttpServletRequest request = new MockHttpServletRequest(); request.addHeader(Utils.CAMBRIA_AUTH_HEADER, "User:Password"); assertEquals("User", Utils.getUserApiKey(request)); - + MockHttpServletRequest request2 = new MockHttpServletRequest(); Principal principal = new BasicUserPrincipal("User@Test"); request2.setUserPrincipal(principal); request2.addHeader("Authorization", "test"); assertEquals("User", Utils.getUserApiKey(request2)); - + MockHttpServletRequest request3 = new MockHttpServletRequest(); assertNull(Utils.getUserApiKey(request3)); } - + @Test public void testgetFromattedBatchSequenceId(){ Long x = new Long(1234); String str = Utils.getFromattedBatchSequenceId(x); - assertEquals("001234", str); + assertEquals("001234", str); } - + @Test public void testmessageLengthInBytes(){ String str = "TestString"; @@ -99,38 +106,58 @@ public class UtilsTest { assertNull(Utils.getResponseTransactionId(null)); assertNull(Utils.getResponseTransactionId("")); } - + @Test public void testgetSleepMsForRate(){ long x = Utils.getSleepMsForRate(1024.124); assertEquals(1000, x); assertEquals(0, Utils.getSleepMsForRate(-1)); } - + @Test public void testgetRemoteAddress(){ DMaaPContext dMaapContext = new DMaaPContext(); MockHttpServletRequest request = new MockHttpServletRequest(); - + dMaapContext.setRequest(request); - + assertEquals(request.getRemoteAddr(), Utils.getRemoteAddress(dMaapContext)); - + request.addHeader("X-Forwarded-For", "XForward"); assertEquals("XForward", Utils.getRemoteAddress(dMaapContext)); - - + + } - + @Test public void testGetKey(){ assertNotNull(Utils.getKafkaproperty()); - + } - + @Test public void testCadiEnable(){ assertFalse(Utils.isCadiEnabled()); - + + } + + @Test + public void testaddSaslPropsPlain() { + Properties props = new Properties(); + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put(Utils.SASL_MECH, "PLAIN"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';"); + assertEquals(props, Utils.addSaslProps()); + } + + @Test + public void testaddSaslPropsScram(){ + Properties props = new Properties(); + environmentVariables.set("SASLMECH", "scram-sha-512"); + environmentVariables.set("JAASLOGIN", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';"); + props.put("security.protocol", "SASL_PLAINTEXT"); + props.put(Utils.SASL_MECH, "SCRAM-SHA-512"); + props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';"); + assertEquals(props, Utils.addSaslProps()); } } |