summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml6
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java137
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java59
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java171
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java121
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java2
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java147
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java73
8 files changed, 379 insertions, 337 deletions
diff --git a/pom.xml b/pom.xml
index 6035e25..12e4b98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -719,6 +719,12 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>system-rules</artifactId>
+ <version>1.17.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<!-- Use this profile to run the AJSC locally. This profile can be successfully
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
index 62dc2a5..ac40603 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaPublisher.java
@@ -8,22 +8,23 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.backends.kafka;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -43,9 +44,9 @@ import java.util.Properties;
/**
* Sends raw JSON objects into Kafka.
- *
+ *
* Could improve space: BSON rather than JSON?
- *
+ *
* @author peter
*
*/
@@ -53,7 +54,7 @@ import java.util.Properties;
public class KafkaPublisher implements Publisher {
/**
* constructor initializing
- *
+ *
* @param settings
* @throws rrNvReadable.missingReqdSetting
*/
@@ -62,35 +63,33 @@ public class KafkaPublisher implements Publisher {
final Properties props = new Properties();
String kafkaConnUrl= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka.metadata.broker.list");
if(StringUtils.isEmpty(kafkaConnUrl)){
-
+
kafkaConnUrl="localhost:9092";
}
-
-
- if(Utils.isCadiEnabled()){
- transferSetting( props, "sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- transferSetting( props, "security.protocol", "SASL_PLAINTEXT");
- transferSetting( props, "sasl.mechanism", "PLAIN");
- }
+
+
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
transferSetting( props, "bootstrap.servers",kafkaConnUrl);
-
+
transferSetting( props, "request.required.acks", "1");
transferSetting( props, "message.send.max.retries", "5");
- transferSetting(props, "retry.backoff.ms", "150");
+ transferSetting(props, "retry.backoff.ms", "150");
+
+
-
-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
-
+
+
fProducer = new KafkaProducer<>(props);
}
/**
* Send a message with a given topic and key.
- *
+ *
* @param msg
* @throws FailedToSendMessageException
* @throws JSONException
@@ -102,21 +101,21 @@ public class KafkaPublisher implements Publisher {
sendMessages(topic, msgs);
}
- /**
+ /**
* method publishing batch messages
- * This method is commented from 0.8 to 0.11 upgrade
+ * This method is commented from 0.8 to 0.11 upgrade
* @param topic
* @param kms
* throws IOException
*
public void sendBatchMessage(String topic, ArrayList<KeyedMessage<String, String>> kms) throws IOException {
- try {
- fProducer.send(kms);
+ try {
+ fProducer.send(kms);
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
- }
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
} */
@@ -131,63 +130,63 @@ public class KafkaPublisher implements Publisher {
fProducer.send(km);
}
- } catch (Exception excp) {
+ } catch (Exception excp) {
log.error("Failed to send message(s) to topic [" + topic + "].", excp);
throw new IOException(excp.getMessage(), excp);
}
}
-
+
/**
* Send a set of messages. Each must have a "key" string value.
- *
+ *
* @param topic
* @param msg
* @throws FailedToSendMessageException
* @throws JSONException
*
+ @Override
+ public void sendMessages(String topic, List<? extends message> msgs)
+ throws IOException, FailedToSendMessageException {
+ log.info("sending " + msgs.size() + " events to [" + topic + "]");
+
+ final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
+ for (message o : msgs) {
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
+ kms.add(data);
+ }
+ try {
+ fProducer.send(kms);
+
+ } catch (FailedToSendMessageException excp) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", excp);
+ throw new FailedToSendMessageException(excp.getMessage(), excp);
+ }
+ } */
@Override
- public void sendMessages(String topic, List<? extends message> msgs)
- throws IOException, FailedToSendMessageException {
+ public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
log.info("sending " + msgs.size() + " events to [" + topic + "]");
-
- final List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>(msgs.size());
- for (message o : msgs) {
- final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, o.getKey(), o.toString());
- kms.add(data);
- }
try {
- fProducer.send(kms);
-
- } catch (FailedToSendMessageException excp) {
- log.error("Failed to send message(s) to topic [" + topic + "].", excp);
- throw new FailedToSendMessageException(excp.getMessage(), excp);
+ for (message o : msgs) {
+ final ProducerRecord<String, String> data =
+ new ProducerRecord<>(topic, o.getKey(), o.toString());
+ fProducer.send(data);
+ }
+ } catch (Exception e) {
+ log.error("Failed to send message(s) to topic [" + topic + "].", e);
}
- } */
- @Override
- public void sendMessagesNew(String topic, List<? extends message> msgs) throws IOException {
- log.info("sending " + msgs.size() + " events to [" + topic + "]");
- try {
- for (message o : msgs) {
- final ProducerRecord<String, String> data =
- new ProducerRecord<>(topic, o.getKey(), o.toString());
- fProducer.send(data);
- }
- } catch (Exception e) {
- log.error("Failed to send message(s) to topic [" + topic + "].", e);
- }
- }
-
-
+ }
+
+
private Producer<String, String> fProducer;
- /**
- * It sets the key value pair
- * @param topic
- * @param msg
- * @param key
- * @param defVal
- */
+ /**
+ * It sets the key value pair
+ * @param topic
+ * @param msg
+ * @param key
+ * @param defVal
+ */
private void transferSetting(Properties props, String key, String defVal) {
String kafkaProp= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"kafka." + key);
if (StringUtils.isEmpty(kafkaProp)) kafkaProp=defVal;
@@ -199,6 +198,6 @@ public class KafkaPublisher implements Publisher {
@Override
public void sendMessages(String topic, List<? extends message> msgs) throws IOException {
// TODO Auto-generated method stub
-
+
}
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
index 26a8cf4..6f5a17c 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
@@ -27,6 +27,7 @@ import com.att.eelf.configuration.EELFManager;
import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
@@ -52,13 +53,13 @@ import java.util.concurrent.TimeUnit;
*/
public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-
+
/**
* constructor initialization
- *
+ *
* @param settings
* @param metrics
* @param curator
@@ -68,8 +69,8 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator,
- @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
@@ -97,7 +98,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
final boolean isCacheEnabled = kSetting_EnableCache;
-
+
fCache = null;
if (isCacheEnabled) {
fCache = KafkaConsumerCache.getInstance();
@@ -108,15 +109,15 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.setfApiId(apiNodeId);
fCache.startCache(mode, curator);
if(kafkaLiveLockAvoider!=null){
- kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
- fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
+ kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
+ fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
}
}
}
/*
* getConsumerFor
- *
+ *
* @see
* com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
* java.lang.String, java.lang.String, int, java.lang.String) This method is
@@ -128,7 +129,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
*/
@Override
public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
Kafka011Consumer kc;
// To synchronize based on the consumer group.
@@ -179,11 +180,11 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
+ "], on topic [" + topic + "].");
-
+
if (fCache != null) {
fCache.signalOwnership(topic, consumerGroupName, consumerId);
}
-
+
final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
long fCreateTimeMs = System.currentTimeMillis();
KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
@@ -210,7 +211,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
+ consumerId);
log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
+ consumerId);
-
+
} finally {
if (locked) {
try {
@@ -242,7 +243,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
fCache.dropAllConsumers();
}
-
+
private KafkaConsumerCache fCache;
private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
private String fkafkaBrokers;
@@ -256,38 +257,36 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
private void transferSettingIfProvided(Properties target, String key, String prefix) {
String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
-
+
if (null != keyVal) {
-
+
log.info("Setting [" + key + "] to " + keyVal + ".");
target.put(key, keyVal);
}
}
/**
- * Name CreateConsumerconfig
+ * Name CreateConsumerconfig
* @param topic
* @param groupId
* @param consumerId
* @return Properties
- *
+ *
* This method is to create Properties required to create kafka connection
- * Group name is replaced with different format groupid--topic to address same
- * groupids for multiple topics. Same groupid with multiple topics
+ * Group name is replaced with different format groupid--topic to address same
+ * groupids for multiple topics. Same groupid with multiple topics
* may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
*/
private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
final Properties props = new Properties();
//fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
- //Fix for CPFMF-644 :
+ //Fix for CPFMF-644 :
final String fakeGroupName = groupId + "--" + topic;
props.put("group.id", fakeGroupName);
props.put("enable.auto.commit", "false"); // 0.11
props.put("bootstrap.servers", fkafkaBrokers);
if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
+ props.putAll(Utils.addSaslProps());
}
props.put("client.id", consumerId);
@@ -314,7 +313,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/**
* putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
+ *
* @param setting
*/
private static void populateKafkaInternalDefaultsMap() { }
@@ -322,7 +321,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
/*
* The starterIncremnt value is just to emulate calling certain consumers,
* in this test app all the consumers are local
- *
+ *
*/
private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
@@ -346,7 +345,7 @@ public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
@SuppressWarnings("rawtypes")
@Override
public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
+ String remotehost) throws UnavailableException, CambriaApiException {
// TODO Auto-generated method stub
return null;
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 9ab4c83..7a08345 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.beans;
@@ -55,7 +55,7 @@ import java.util.concurrent.ExecutionException;
/**
* class performing all topic operations
- *
+ *
* @author anowarul.islam
*
*/
@@ -73,32 +73,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fkafkaBrokers = "localhost:9092";
}
-
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
-
- fKafkaAdminClient=AdminClient.create ( props );
-
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ fKafkaAdminClient=AdminClient.create ( props );
+
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
private final AdminClient fKafkaAdminClient;
-
-
+
+
/**
* DMaaPKafkaMetaBroker constructor initializing
- *
+ *
* @param settings
* @param zk
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
@@ -109,30 +106,28 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fkafkaBrokers = "localhost:9092";
}
-
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
- fKafkaAdminClient=AdminClient.create ( props );
-
-
-
+
+ if(Utils.isCadiEnabled()){
+ props.putAll(Utils.addSaslProps());
+ }
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
+ fKafkaAdminClient=AdminClient.create ( props );
+
+
+
}
-
+
public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ ZkClient zk, ConfigDb configDb,AdminClient client) {
+
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
- fKafkaAdminClient= client;
-
-
-
+ fKafkaAdminClient= client;
+
+
+
}
@Override
@@ -169,7 +164,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* static method get KafkaTopic object
- *
+ *
* @param db
* @param base
* @param topic
@@ -185,7 +180,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
log.info("Creating topic: " + topic);
try {
log.info("Check if topic [" + topic + "] exist.");
@@ -216,23 +211,23 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
// create via kafka
- try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
- final KafkaFuture<Void> ctrResult = ctr.all();
- ctrResult.get();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
- } catch (InterruptedException e) {
- log.warn("Execution of describeTopics timed out.");
- throw new ConfigDbException(e);
- } catch (ExecutionException e) {
- log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
- throw new ConfigDbException(e.getCause());
- }
-
+ try {
+ final NewTopic topicRequest =
+ new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr =
+ fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final KafkaFuture<Void> ctrResult = ctr.all();
+ ctrResult.get();
+ // underlying Kafka topic created. now setup our API info
+ return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
+ } catch (InterruptedException e) {
+ log.warn("Execution of describeTopics timed out.");
+ throw new ConfigDbException(e);
+ } catch (ExecutionException e) {
+ log.warn("Execution of describeTopics failed: " + e.getCause().getMessage(), e);
+ throw new ConfigDbException(e.getCause());
+ }
+
}
@Override
@@ -240,13 +235,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
log.info("Deleting topic: " + topic);
try {
log.info("Loading zookeeper client for topic deletion.");
- // topic creation. (Otherwise, the topic is only partially created
+ // topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
+
+
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
+
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
@@ -265,7 +260,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
- *
+ *
* @param name
* @param desc
* @param owner
@@ -280,7 +275,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* static method giving kafka topic object
- *
+ *
* @param db
* @param basePath
* @param name
@@ -291,7 +286,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
@@ -303,14 +298,14 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
/**
* class performing all user opearation like user is eligible to read,
* write. permitting a user to write and read,
- *
+ *
* @author anowarul.islam
*
*/
public static class KafkaTopic implements Topic {
/**
* constructor initializes
- *
+ *
* @param name
* @param configdb
* @param baseTopic
@@ -330,26 +325,26 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fOwner = o.optString("owner", "");
fDesc = o.optString("description", "");
fTransactionEnabled = o.optBoolean("txenabled", false);// default
- // value is
- // false
+ // value is
+ // false
// if this topic has an owner, it needs both read/write ACLs. If there's no
- // owner (or it's empty), null is okay -- this is for existing or implicitly
- // created topics.
- JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 )
- {
- readers = kEmptyAcl;
- }
- fReaders = fromJson ( readers );
-
- JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 )
- {
- writers = kEmptyAcl;
- }
- fWriters = fromJson ( writers );
+ // owner (or it's empty), null is okay -- this is for existing or implicitly
+ // created topics.
+ JSONObject readers = o.optJSONObject ( "readers" );
+ if ( readers == null && fOwner.length () > 0 )
+ {
+ readers = kEmptyAcl;
+ }
+ fReaders = fromJson ( readers );
+
+ JSONObject writers = o.optJSONObject ( "writers" );
+ if ( writers == null && fOwner.length () > 0 )
+ {
+ writers = kEmptyAcl;
+ }
+ fWriters = fromJson ( writers );
}
-
+
private NsaAcl fromJson(JSONObject o) {
NsaAcl acl = new NsaAcl();
if (o != null) {
@@ -427,7 +422,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
try
{
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
+
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
@@ -435,15 +430,15 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-
+
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
+
}
catch ( ConfigDbException | AccessDeniedException x )
{
throw x;
}
-
+
}
private JSONObject safeSerialize(NsaAcl acl) {
@@ -458,7 +453,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
private final NsaAcl fReaders;
private final NsaAcl fWriters;
private boolean fTransactionEnabled;
-
+
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
index 662f0f7..c420072 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
@@ -8,30 +8,30 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.utils;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
-
-import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
import java.security.Principal;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
/**
* This is an utility class for various operations for formatting
@@ -46,13 +46,14 @@ public class Utils {
private static final String BATCH_ID_FORMAT = "000000";
private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
+ public static final String SASL_MECH = "sasl.mechanism";
private Utils() {
super();
}
/**
- * Formatting the date
+ * Formatting the date
* @param date
* @return date or null
*/
@@ -128,55 +129,71 @@ public class Utils {
*/
public static long getSleepMsForRate ( double ratePerMinute )
{
- if ( ratePerMinute <= 0.0 )
+ if ( ratePerMinute <= 0.0 )
{
return 0;
}
return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) );
}
- public static String getRemoteAddress(DMaaPContext ctx)
- {
- String reqAddr = ctx.getRequest().getRemoteAddr();
- String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
- return ((fwdHeader != null) ? fwdHeader : reqAddr);
- }
- public static String getFirstHeader(String h,DMaaPContext ctx)
- {
- List l = getHeader(h,ctx);
- return ((l.size() > 0) ? (String)l.iterator().next() : null);
- }
- public static List<String> getHeader(String h,DMaaPContext ctx)
- {
- LinkedList list = new LinkedList();
- Enumeration e = ctx.getRequest().getHeaders(h);
- while (e.hasMoreElements())
- {
- list.add(e.nextElement().toString());
- }
- return list;
- }
-
- public static String getKafkaproperty(){
- InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
- Properties props = new Properties();
- try {
- props.load(input);
- } catch (IOException e) {
- log.error("failed to read kafka.properties");
- }
- return props.getProperty("key");
-
-
- }
-
- public static boolean isCadiEnabled(){
- boolean enableCadi=false;
- if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
- enableCadi=true;
- }
-
- return enableCadi;
- }
-
+ public static String getRemoteAddress(DMaaPContext ctx)
+ {
+ String reqAddr = ctx.getRequest().getRemoteAddr();
+ String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
+ return ((fwdHeader != null) ? fwdHeader : reqAddr);
+ }
+ public static String getFirstHeader(String h,DMaaPContext ctx)
+ {
+ List l = getHeader(h,ctx);
+ return ((l.size() > 0) ? (String)l.iterator().next() : null);
+ }
+ public static List<String> getHeader(String h,DMaaPContext ctx)
+ {
+ LinkedList list = new LinkedList();
+ Enumeration e = ctx.getRequest().getHeaders(h);
+ while (e.hasMoreElements())
+ {
+ list.add(e.nextElement().toString());
+ }
+ return list;
+ }
+
+ public static String getKafkaproperty(){
+ InputStream input = new Utils().getClass().getResourceAsStream("/kafka.properties");
+ Properties props = new Properties();
+ try {
+ props.load(input);
+ } catch (IOException e) {
+ log.error("failed to read kafka.properties");
+ }
+ return props.getProperty("key");
+
+
+ }
+
+ public static boolean isCadiEnabled(){
+ boolean enableCadi=false;
+ if(System.getenv("enableCadi")!=null&&System.getenv("enableCadi").equals("true")){
+ enableCadi=true;
+ }
+
+ return enableCadi;
+ }
+
+ public static Properties addSaslProps(){
+ Properties props = new Properties();
+ String saslMech = System.getenv("SASLMECH");
+ if (saslMech != null && saslMech.equals("scram-sha-512")) {
+ props.put("sasl.jaas.config", System.getenv("JAASLOGIN"));
+ props.put(SASL_MECH, saslMech.toUpperCase());
+ }
+ else {
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='" + getKafkaproperty() + "';");
+ props.put(SASL_MECH, "PLAIN");
+ }
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ log.info("KafkaAdmin sasl.mechanism set to " + props.getProperty(SASL_MECH));
+ return props;
+
+ }
}
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java
index 7a0fe78..44047e4 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/backends/kafka/KafkaPublisherTest.java
@@ -44,7 +44,7 @@ public class KafkaPublisherTest {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
PowerMockito.mockStatic(Utils.class);
- PowerMockito.when(Utils.isCadiEnabled()).thenReturn(true);
+ PowerMockito.when(Utils.isCadiEnabled()).thenReturn(false);
}
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java
index f49f615..9d7a931 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/embed/EmbedConfigurationReader.java
@@ -9,7 +9,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,26 +18,27 @@
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
- package org.onap.dmaap.mr.cambria.embed;
+package org.onap.dmaap.mr.cambria.embed;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
-
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
-import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
+import org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryMetaBroker;
+import org.onap.dmaap.dmf.mr.backends.memory.MemoryQueue;
import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory;
import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
import org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet;
@@ -49,13 +50,11 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
import org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory;
import org.onap.dmaap.dmf.mr.utils.PropertyReader;
-import com.att.nsa.security.db.BaseNsaApiDbImpl;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import org.onap.dmaap.dmf.mr.utils.Utils;
public class EmbedConfigurationReader {
- private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
+ private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
public static final String TEST_TOPIC = "testTopic";
private static final int BROKER_ID = 0;
private static final int BROKER_PORT = 5000;
@@ -69,49 +68,49 @@ public class EmbedConfigurationReader {
String dir;
private AdminClient fKafkaAdminClient;
KafkaLocal kafkaLocal;
-
- public void setUp() throws Exception {
-
- ClassLoader classLoader = getClass().getClassLoader();
- AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
-
- Properties kafkaProperties;
+
+ public void setUp() throws Exception {
+
+ ClassLoader classLoader = getClass().getClassLoader();
+ AJSCPropertiesMap.refresh(new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()));
+
+ Properties kafkaProperties;
Properties zkProperties;
try {
//load properties
- dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
+ dir = new File(classLoader.getResource(CambriaConstants.msgRtr_prop).getFile()).getParent();
kafkaProperties = getKafkaProperties(dir + DEFAULT_KAFKA_LOG_DIR, BROKER_PORT, BROKER_ID);
zkProperties = getZookeeperProperties(ZOOKEEPER_PORT,dir + DEFAULT_ZOOKEEPER_LOG_DIR);
//start kafkaLocalServer
kafkaLocal = new KafkaLocal(kafkaProperties, zkProperties);
-
+
Map<String, String> map = AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
map.put(CambriaConstants.kSetting_ZkConfigDbServers, ZOOKEEPER_HOST);
map.put("kafka.client.zookeeper", ZOOKEEPER_HOST);
map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
-
+
DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
-
+
final Properties props = new Properties ();
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- fKafkaAdminClient = AdminClient.create ( props );
-
- // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");
+ fKafkaAdminClient = AdminClient.create ( props );
+
+ // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
// AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
- final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
- fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
+ final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
+ fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace(System.out);
- }
- }
-
- private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
+ }
+ }
+
+ private static Properties getKafkaProperties(String logDir, int port, int brokerId) {
Properties properties = new Properties();
properties.put("port", port + "");
properties.put("broker.id", brokerId + "");
@@ -122,47 +121,47 @@ public class EmbedConfigurationReader {
properties.put("consumer.timeout.ms", -1);
return properties;
}
-
- private static Properties getZookeeperProperties(int port, String zookeeperDir) {
+
+ private static Properties getZookeeperProperties(int port, String zookeeperDir) {
Properties properties = new Properties();
properties.put("clientPort", port + "");
properties.put("dataDir", zookeeperDir);
return properties;
}
- public void tearDown() throws Exception {
- DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
- if(fKafkaAdminClient!=null)
- fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
- //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
- //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
- //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
- kafkaLocal.stop();
- FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
- }
-
-
- public ConfigurationReader buildConfigurationReader() throws Exception {
-
- setUp();
-
- PropertyReader propertyReader = new PropertyReader();
- DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
- DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
- DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
- CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
- DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
- MemoryQueue memoryQueue = new MemoryQueue();
- MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
- BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
- DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
- KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
- DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
-
- return new ConfigurationReader(propertyReader,
- dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
- curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
- memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
-
- }
+ public void tearDown() throws Exception {
+ DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
+ if(fKafkaAdminClient!=null)
+ fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
+ //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
+ //dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
+ //dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
+ kafkaLocal.stop();
+ FileUtils.cleanDirectory(new File(dir + DEFAULT_KAFKA_LOG_DIR));
+ }
+
+
+ public ConfigurationReader buildConfigurationReader() throws Exception {
+
+ setUp();
+
+ PropertyReader propertyReader = new PropertyReader();
+ DMaaPMetricsSet dMaaPMetricsSet = new DMaaPMetricsSet(propertyReader);
+ DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
+ DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
+ CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
+ DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
+ MemoryQueue memoryQueue = new MemoryQueue();
+ MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
+ BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());
+ DMaaPAuthenticator<NsaSimpleApiKey> dMaaPAuthenticator = new DMaaPAuthenticatorImpl<>(baseNsaApiDbImpl);
+ KafkaPublisher kafkaPublisher = new KafkaPublisher(propertyReader);
+ DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker = new DMaaPKafkaMetaBroker(propertyReader, dMaaPZkClient, dMaaPZkConfigDb);
+
+ return new ConfigurationReader(propertyReader,
+ dMaaPMetricsSet, dMaaPZkClient, dMaaPZkConfigDb, kafkaPublisher,
+ curatorFramework, dMaaPKafkaConsumerFactory, dMaaPKafkaMetaBroker,
+ memoryQueue, memoryMetaBroker, baseNsaApiDbImpl, dMaaPAuthenticator);
+
+ }
}
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java
index 8a4009b..74f6750 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/utils/UtilsTest.java
@@ -1,5 +1,5 @@
/*******************************************************************************
-/*-
+ /*-
* ============LICENSE_START=======================================================
* ONAP Policy Engine
* ================================================================================
@@ -8,9 +8,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,22 +18,26 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-
- package org.onap.dmaap.mr.cambria.utils;
+
+package org.onap.dmaap.mr.cambria.utils;
import static org.junit.Assert.*;
import java.security.Principal;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.Properties;
import org.apache.http.auth.BasicUserPrincipal;
import org.junit.After;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.springframework.mock.web.MockHttpServletRequest;
+
import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
import org.onap.dmaap.dmf.mr.utils.Utils;
@@ -41,6 +45,9 @@ public class UtilsTest {
private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
+ @Rule
+ public EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
@Before
public void setUp() throws Exception {
}
@@ -57,33 +64,33 @@ public class UtilsTest {
String expectedStr = sdf.format(now);
assertNotNull(dateStr);
assertTrue("Formatted date does not match - expected [" + expectedStr
- + "] received [" + dateStr + "]",
+ + "] received [" + dateStr + "]",
dateStr.equalsIgnoreCase(expectedStr));
}
-
+
@Test
public void testgetUserApiKey(){
MockHttpServletRequest request = new MockHttpServletRequest();
request.addHeader(Utils.CAMBRIA_AUTH_HEADER, "User:Password");
assertEquals("User", Utils.getUserApiKey(request));
-
+
MockHttpServletRequest request2 = new MockHttpServletRequest();
Principal principal = new BasicUserPrincipal("User@Test");
request2.setUserPrincipal(principal);
request2.addHeader("Authorization", "test");
assertEquals("User", Utils.getUserApiKey(request2));
-
+
MockHttpServletRequest request3 = new MockHttpServletRequest();
assertNull(Utils.getUserApiKey(request3));
}
-
+
@Test
public void testgetFromattedBatchSequenceId(){
Long x = new Long(1234);
String str = Utils.getFromattedBatchSequenceId(x);
- assertEquals("001234", str);
+ assertEquals("001234", str);
}
-
+
@Test
public void testmessageLengthInBytes(){
String str = "TestString";
@@ -99,38 +106,58 @@ public class UtilsTest {
assertNull(Utils.getResponseTransactionId(null));
assertNull(Utils.getResponseTransactionId(""));
}
-
+
@Test
public void testgetSleepMsForRate(){
long x = Utils.getSleepMsForRate(1024.124);
assertEquals(1000, x);
assertEquals(0, Utils.getSleepMsForRate(-1));
}
-
+
@Test
public void testgetRemoteAddress(){
DMaaPContext dMaapContext = new DMaaPContext();
MockHttpServletRequest request = new MockHttpServletRequest();
-
+
dMaapContext.setRequest(request);
-
+
assertEquals(request.getRemoteAddr(), Utils.getRemoteAddress(dMaapContext));
-
+
request.addHeader("X-Forwarded-For", "XForward");
assertEquals("XForward", Utils.getRemoteAddress(dMaapContext));
-
-
+
+
}
-
+
@Test
public void testGetKey(){
assertNotNull(Utils.getKafkaproperty());
-
+
}
-
+
@Test
public void testCadiEnable(){
assertFalse(Utils.isCadiEnabled());
-
+
+ }
+
+ @Test
+ public void testaddSaslPropsPlain() {
+ Properties props = new Properties();
+ props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put(Utils.SASL_MECH, "PLAIN");
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret';");
+ assertEquals(props, Utils.addSaslProps());
+ }
+
+ @Test
+ public void testaddSaslPropsScram(){
+ Properties props = new Properties();
+ environmentVariables.set("SASLMECH", "scram-sha-512");
+ environmentVariables.set("JAASLOGIN", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';");
+ props.put("security.protocol", "SASL_PLAINTEXT");
+ props.put(Utils.SASL_MECH, "SCRAM-SHA-512");
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='onap-dmaap-strimzi-kafka-admin' password='qul6A3TLvidY';");
+ assertEquals(props, Utils.addSaslProps());
}
}