summaryrefslogtreecommitdiffstats
path: root/src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java')
-rw-r--r--src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java57
1 files changed, 37 insertions, 20 deletions
diff --git a/src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java b/src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java
index 7233c6c..30cb460 100644
--- a/src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java
+++ b/src/test/java/com/att/nsa/cambria/embed/EmbedConfigurationReader.java
@@ -23,6 +23,7 @@
package com.att.nsa.cambria.embed;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -30,25 +31,29 @@ import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.nsa.cambria.backends.kafka.KafkaPublisher;
-import com.att.nsa.cambria.backends.memory.MemoryMetaBroker;
-import com.att.nsa.cambria.backends.memory.MemoryQueue;
-import com.att.nsa.cambria.beans.DMaaPKafkaConsumerFactory;
-import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
-import com.att.nsa.cambria.beans.DMaaPMetricsSet;
-import com.att.nsa.cambria.beans.DMaaPZkClient;
-import com.att.nsa.cambria.beans.DMaaPZkConfigDb;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.security.DMaaPAuthenticator;
-import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.cambria.utils.DMaaPCuratorFactory;
-import com.att.nsa.cambria.utils.PropertyReader;
+import com.att.dmf.mr.backends.kafka.KafkaPublisher;
+import com.att.dmf.mr.backends.memory.MemoryMetaBroker;
+import com.att.dmf.mr.backends.memory.MemoryQueue;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.KafkaFuture;
+import com.att.dmf.mr.beans.DMaaPKafkaConsumerFactory;
+import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
+import com.att.dmf.mr.beans.DMaaPMetricsSet;
+import com.att.dmf.mr.beans.DMaaPZkClient;
+import com.att.dmf.mr.beans.DMaaPZkConfigDb;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.security.DMaaPAuthenticator;
+import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
+import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.dmf.mr.utils.DMaaPCuratorFactory;
+import com.att.dmf.mr.utils.PropertyReader;
import com.att.nsa.security.db.BaseNsaApiDbImpl;
import com.att.nsa.security.db.simple.NsaSimpleApiKey;
import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
-import kafka.admin.AdminUtils;
public class EmbedConfigurationReader {
private static final String DEFAULT_KAFKA_LOG_DIR = "/kafka_embedded";
@@ -63,7 +68,7 @@ public class EmbedConfigurationReader {
private static final String groupId = "groupID";
String dir;
-
+ private AdminClient fKafkaAdminClient;
KafkaLocal kafkaLocal;
public void setUp() throws Exception {
@@ -89,8 +94,18 @@ public class EmbedConfigurationReader {
map.put("kafka.metadata.broker.list", LOCALHOST_BROKER);
DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
- if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
- AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
+
+ final Properties props = new Properties ();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" );
+ props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin_secret'");
+ props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ props.put("sasl.mechanism", "PLAIN");
+ fKafkaAdminClient = AdminClient.create ( props );
+
+ // if(!AdminUtils.topicExists(dMaaPZkClient, TEST_TOPIC))
+ // AdminUtils.createTopic(dMaaPZkClient, TEST_TOPIC, 3, 1, new Properties());
+ final NewTopic topicRequest = new NewTopic ( TEST_TOPIC, 3, new Integer(1).shortValue () );
+ fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
Thread.sleep(5000);
} catch (Exception e){
e.printStackTrace(System.out);
@@ -118,7 +133,9 @@ public class EmbedConfigurationReader {
public void tearDown() throws Exception {
DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(new PropertyReader());
- AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
+ if(fKafkaAdminClient!=null)
+ fKafkaAdminClient.deleteTopics(Arrays.asList(TEST_TOPIC));
+ //AdminUtils.deleteTopic(dMaaPZkClient, TEST_TOPIC);
//dMaaPZkClient.delete(dir + DEFAULT_KAFKA_LOG_DIR);
//dMaaPZkClient.delete(dir + DEFAULT_ZOOKEEPER_LOG_DIR);
kafkaLocal.stop();
@@ -135,7 +152,7 @@ public class EmbedConfigurationReader {
DMaaPZkClient dMaaPZkClient = new DMaaPZkClient(propertyReader);
DMaaPZkConfigDb dMaaPZkConfigDb = new DMaaPZkConfigDb(dMaaPZkClient, propertyReader);
CuratorFramework curatorFramework = DMaaPCuratorFactory.getCurator(new PropertyReader());
- DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(propertyReader, dMaaPMetricsSet, curatorFramework);
+ DMaaPKafkaConsumerFactory dMaaPKafkaConsumerFactory = new DMaaPKafkaConsumerFactory(dMaaPMetricsSet, curatorFramework,null);
MemoryQueue memoryQueue = new MemoryQueue();
MemoryMetaBroker memoryMetaBroker = new MemoryMetaBroker(memoryQueue, dMaaPZkConfigDb);
BaseNsaApiDbImpl<NsaSimpleApiKey> baseNsaApiDbImpl = new BaseNsaApiDbImpl<>(dMaaPZkConfigDb, new NsaSimpleApiKeyFactory());