summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@ericsson.com>2018-09-14 23:05:56 +0100
committerliamfallon <liam.fallon@ericsson.com>2018-09-15 20:00:59 +0100
commite13ff2c6faf63caab2d47fa63777e965e32ec642 (patch)
treedf11a546dbb03af6cf7a9eeaddd9d4a8275ce4cc
parenta65e4772f4557a109917532b2d9c49680ce3bb15 (diff)
Re-implement Kafka tests that periodically fail
The Kafla integration tests fail, this change re-implements the tests using a test framework from salesforce.com i Issue-ID: POLICY-1034 Change-Id: Iffcc9e0a9f419c8ec439771be7a7a58faa2f9860 Signed-off-by: liamfallon <liam.fallon@ericsson.com>
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java2
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java9
-rw-r--r--pom.xml2
-rw-r--r--testsuites/integration/integration-uservice-test/pom.xml34
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java82
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java62
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java142
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json2
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json2
9 files changed, 107 insertions, 230 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index dfb12617c..be1b943d4 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -164,7 +164,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
final ConsumerRecords<String, String> records =
- kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime());
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
for (final ConsumerRecord<String, String> record : records) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 9d7cc77f3..7c24ce1aa 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -20,6 +20,7 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
@@ -263,6 +264,14 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
/**
+ * Gets the consumer poll duration.
+ * @return The poll duration
+ */
+ public Duration getConsumerPollDuration() {
+ return Duration.ofMillis(consumerPollTime);
+ }
+
+ /**
* Gets the consumer topic list.
*
* @return the consumer topic list
diff --git a/pom.xml b/pom.xml
index 18fce7126..cbe004414 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,7 @@
<file.encoding>UTF-8</file.encoding>
<version.derby>10.13.1.1</version.derby>
<version.commons-cli>1.4</version.commons-cli>
- <version.kafka>1.1.1</version.kafka>
+ <version.kafka>2.0.0</version.kafka>
<version.jersey>2.26</version.jersey>
<version.eclipselink>2.6.5</version.eclipselink>
<version.hibernate>5.3.6.Final</version.hibernate>
diff --git a/testsuites/integration/integration-uservice-test/pom.xml b/testsuites/integration/integration-uservice-test/pom.xml
index 52a54c2dd..6466895c8 100644
--- a/testsuites/integration/integration-uservice-test/pom.xml
+++ b/testsuites/integration/integration-uservice-test/pom.xml
@@ -142,41 +142,15 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_2.12</artifactId>
<version>${version.kafka}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>${version.kafka}</version>
- <classifier>test</classifier>
+ <groupId>com.salesforce.kafka.test</groupId>
+ <artifactId>kafka-junit4</artifactId>
+ <version>3.0.1</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${version.kafka}</version>
- <classifier>test</classifier>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
index 072d67864..63da4e696 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
@@ -20,12 +20,13 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
-import java.util.Properties;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
import java.util.concurrent.TimeUnit;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator;
@@ -36,7 +37,7 @@ import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGe
*/
public class KafkaEventProducer implements Runnable {
private final String topic;
- private final String kafkaServerAddress;
+ private final SharedKafkaTestResource sharedKafkaTestResource;
private final int eventCount;
private final boolean xmlEvents;
private final long eventInterval;
@@ -50,15 +51,15 @@ public class KafkaEventProducer implements Runnable {
* Instantiates a new kafka event producer.
*
* @param topic the topic
- * @param kafkaServerAddress the kafka server address
+ * @param sharedKafkaTestResource the kafka server address
* @param eventCount the event count
* @param xmlEvents the xml events
* @param eventInterval the event interval
*/
- public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount,
- final boolean xmlEvents, final long eventInterval) {
+ public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
+ final int eventCount, final boolean xmlEvents, final long eventInterval) {
this.topic = topic;
- this.kafkaServerAddress = kafkaServerAddress;
+ this.sharedKafkaTestResource = sharedKafkaTestResource;
this.eventCount = eventCount;
this.xmlEvents = xmlEvents;
this.eventInterval = eventInterval;
@@ -67,22 +68,15 @@ public class KafkaEventProducer implements Runnable {
producerThread.start();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
- final Properties kafkaProducerProperties = new Properties();
- kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress);
- kafkaProducerProperties.put("acks", "all");
- kafkaProducerProperties.put("retries", 0);
- kafkaProducerProperties.put("batch.size", 16384);
- kafkaProducerProperties.put("linger.ms", 1);
- kafkaProducerProperties.put("buffer.memory", 33554432);
- kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- final Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProducerProperties);
+ final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
+ .getKafkaProducer(StringSerializer.class, StringSerializer.class);
while (producerThread.isAlive() && !stopFlag) {
ThreadUtilities.sleep(50);
@@ -109,8 +103,8 @@ public class KafkaEventProducer implements Runnable {
* @param producer the producer
*/
private void sendEventsToTopic(final Producer<String, String> producer) {
- System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at "
- + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents);
+ System.out.println(KafkaEventProducer.class.getCanonicalName()
+ + ": sending events to Kafka server , event count " + eventCount + ", xmlEvents " + xmlEvents);
for (int i = 0; i < eventCount; i++) {
System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval
@@ -126,7 +120,7 @@ public class KafkaEventProducer implements Runnable {
producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
producer.flush();
eventsSentCount++;
- System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString);
+ System.out.println("****** Sent event No. " + eventsSentCount + " ******");
}
System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed");
}
@@ -154,48 +148,4 @@ public class KafkaEventProducer implements Runnable {
System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped");
}
-
- /**
- * The main method.
- *
- * @param args the arguments
- */
- public static void main(final String[] args) {
- if (args.length != 5) {
- System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
- return;
- }
-
- int eventCount = 0;
- try {
- eventCount = Integer.parseInt(args[2]);
- } catch (final Exception e) {
- System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
- e.printStackTrace();
- return;
- }
-
- long eventInterval = 0;
- try {
- eventInterval = Long.parseLong(args[4]);
- } catch (final Exception e) {
- System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
- e.printStackTrace();
- return;
- }
-
- boolean xmlEvents = false;
- if (args[3].equalsIgnoreCase("XML")) {
- xmlEvents = true;
- } else if (!args[3].equalsIgnoreCase("JSON")) {
- System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval");
- return;
- }
-
- final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents,
- eventInterval);
-
- producer.sendEvents();
- producer.shutdown();
- }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
index 4b6a62e28..1c33289b7 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
@@ -20,12 +20,16 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
+import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
@@ -35,8 +39,9 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
public class KafkaEventSubscriber implements Runnable {
+ private static final Duration POLL_DURATION = Duration.ofMillis(100);
+
private final String topic;
- private final String kafkaServerAddress;
private long eventsReceivedCount = 0;
KafkaConsumer<String, String> consumer;
@@ -47,46 +52,44 @@ public class KafkaEventSubscriber implements Runnable {
* Instantiates a new kafka event subscriber.
*
* @param topic the topic
- * @param kafkaServerAddress the kafka server address
+ * @param sharedKafkaTestResource the kafka server address
* @throws MessagingException the messaging exception
*/
- public KafkaEventSubscriber(final String topic, final String kafkaServerAddress) throws MessagingException {
+ public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource)
+ throws MessagingException {
this.topic = topic;
- this.kafkaServerAddress = kafkaServerAddress;
-
- final Properties props = new Properties();
- props.put("bootstrap.servers", kafkaServerAddress);
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- consumer = new KafkaConsumer<String, String>(props);
+
+
+ final Properties consumerProperties = new Properties();
+ consumerProperties.put("group.id", "test");
+
+
+ consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class,
+ StringDeserializer.class, consumerProperties);
consumer.subscribe(Arrays.asList(topic));
subscriberThread = new Thread(this);
subscriberThread.start();
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
- System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": receiving events from Kafka server at "
- + kafkaServerAddress + " on topic " + topic);
+ System.out.println(KafkaEventSubscriber.class.getCanonicalName()
+ + ": receiving events from Kafka server on topic " + topic);
while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
try {
- final ConsumerRecords<String, String> records = consumer.poll(100);
+ final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION);
for (final ConsumerRecord<String, String> record : records) {
- System.out.println("******");
+ eventsReceivedCount++;
+ System.out.println("****** Received event No. " + eventsReceivedCount + " ******");
System.out.println("offset=" + record.offset());
System.out.println("key=" + record.key());
- System.out.println("name=" + record.value());
- eventsReceivedCount++;
}
} catch (final Exception e) {
// Thread interrupted
@@ -119,19 +122,4 @@ public class KafkaEventSubscriber implements Runnable {
consumer.close();
System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": stopped");
}
-
-
- /**
- * The main method.
- *
- * @param args the arguments
- * @throws MessagingException the messaging exception
- */
- public static void main(final String[] args) throws MessagingException {
- if (args.length != 2) {
- System.err.println("usage KafkaEventSubscriber topic kafkaServerAddress");
- return;
- }
- new KafkaEventSubscriber(args[0], args[1]);
- }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
index e70a597c2..6ab910506 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
@@ -21,104 +21,36 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+
+import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
-import java.util.Properties;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.TestUtils;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+
+import org.junit.ClassRule;
import org.junit.Test;
import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
+import org.onap.policy.apex.model.utilities.TextFileUtils;
import org.onap.policy.apex.service.engine.main.ApexMain;
-
/**
- * The Class TestKafka2Kafka.
+ * The Class TestKafka2Kafka tests Kafka event sending and reception.
*/
public class TestKafka2Kafka {
- // The method of starting an embedded Kafka server used in this example is based on the method
- // on stack overflow at
- // https://github.com/asmaier/mini-kafka
-
- private static final long MAX_TEST_LENGTH = 20000;
+ private static final long MAX_TEST_LENGTH = 60000;
- private static final int EVENT_COUNT = 10;
+ private static final int EVENT_COUNT = 100;
private static final int EVENT_INTERVAL = 20;
- private static final String ZKHOST = "127.0.0.1";
- private static final String BROKERHOST = "127.0.0.1";
- private static final String BROKERPORT = "39902";
-
- private static EmbeddedZookeeper zkServer;
- private static ZkClient zkClient;
- private static KafkaServer kafkaServer;
-
- /**
- * Setup dummy kafka server.
- *
- * @throws IOException Signals that an I/O exception has occurred.
- */
- @BeforeClass
- public static void setupDummyKafkaServer() throws IOException {
- // setup Zookeeper
- zkServer = new EmbeddedZookeeper();
- final String zkConnect = ZKHOST + ":" + zkServer.port();
- zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
- final ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
-
- // setup Broker
- final Properties brokerProps = new Properties();
- brokerProps.setProperty("zookeeper.connect", zkConnect);
- brokerProps.setProperty("broker.id", "0");
- brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
- brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
- brokerProps.setProperty("offsets.topic.replication.factor", "1");
- brokerProps.setProperty("transaction.state.log.replication.factor", "1");
- brokerProps.setProperty("transaction.state.log.min.isr", "1");
- final KafkaConfig config = new KafkaConfig(brokerProps);
- final Time mock = new MockTime();
- kafkaServer = TestUtils.createServer(config, mock);
- kafkaServer.startup();
-
- // create topics
- AdminUtils.createTopic(zkUtils, "apex-in-0", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
- AdminUtils.createTopic(zkUtils, "apex-in-1", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
- AdminUtils.createTopic(zkUtils, "apex-out", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
-
- }
-
- /**
- * Shutdown dummy kafka server.
- *
- * @throws IOException Signals that an I/O exception has occurred.
- */
- @AfterClass
- public static void shutdownDummyKafkaServer() throws IOException {
- if (kafkaServer != null) {
- kafkaServer.shutdown();
- }
- if (zkClient != null) {
- zkClient.close();
- }
- if (zkServer != null) {
- zkServer.shutdown();
- }
- }
+ @ClassRule
+ public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
+ // Start a cluster with 1 brokers.
+ .withBrokers(1)
+ // Disable topic auto-creation.
+ .withBrokerProperty("auto.create.topics.enable", "false");
/**
* Test json kafka events.
@@ -128,7 +60,8 @@ public class TestKafka2Kafka {
*/
@Test
public void testJsonKafkaEvents() throws MessagingException, ApexException {
- final String[] args = {"src/test/resources/prodcons/Kafka2KafkaJsonEvent.json"};
+ final String[] args =
+ { "src/test/resources/prodcons/Kafka2KafkaJsonEvent.json" };
testKafkaEvents(args, false, "json");
}
@@ -140,7 +73,8 @@ public class TestKafka2Kafka {
*/
@Test
public void testXmlKafkaEvents() throws MessagingException, ApexException {
- final String[] args = {"src/test/resources/prodcons/Kafka2KafkaXmlEvent.json"};
+ final String[] args =
+ { "src/test/resources/prodcons/Kafka2KafkaXmlEvent.json" };
testKafkaEvents(args, true, "xml");
}
@@ -153,23 +87,45 @@ public class TestKafka2Kafka {
* @throws MessagingException the messaging exception
* @throws ApexException the apex exception
*/
- private void testKafkaEvents(final String[] args, final Boolean xmlEvents, final String topicSuffix)
- throws MessagingException, ApexException {
- final KafkaEventSubscriber subscriber =
- new KafkaEventSubscriber("apex-out-" + topicSuffix, "localhost:" + BROKERPORT);
+ private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
+ throws MessagingException, ApexException {
+
+ try {
+ File tempConfigFile = File.createTempFile("Kafka_", ".json");
+ tempConfigFile.deleteOnExit();
+ String configAsString = TextFileUtils.getTextFileAsString(args[0]).replaceAll("localhost:39902",
+ sharedKafkaTestResource.getKafkaConnectString());
+ TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
+ args[0] = tempConfigFile.getCanonicalPath();
+
+ } catch (IOException e) {
+ fail("test should not throw an exception");
+ }
+
+ sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
+ sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
+
+ final KafkaEventSubscriber subscriber = new KafkaEventSubscriber("apex-out-" + topicSuffix,
+ sharedKafkaTestResource);
final ApexMain apexMain = new ApexMain(args);
ThreadUtilities.sleep(3000);
- final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, "localhost:" + BROKERPORT,
+ final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
producer.sendEvents();
-
+
final long testStartTime = System.currentTimeMillis();
+ // Wait for the producer to send all tis events
+ while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
+ && producer.getEventsSentCount() < EVENT_COUNT) {
+ ThreadUtilities.sleep(EVENT_INTERVAL);
+ }
+
while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
+ && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
ThreadUtilities.sleep(EVENT_INTERVAL);
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
index cd758b18d..f861c27bc 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaJsonEvent.json
@@ -43,7 +43,7 @@
"parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
"parameters": {
"bootstrapServers": "localhost:39902",
- "groupId": "apex-group-id",
+ "groupId": "apex-group",
"enableAutoCommit": true,
"autoCommitTime": 1000,
"sessionTimeout": 30000,
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
index d4468a57e..f18ecc2ed 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/prodcons/Kafka2KafkaXmlEvent.json
@@ -44,7 +44,7 @@
"parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
"parameters": {
"bootstrapServers": "localhost:39902",
- "groupId": "apex-group-id",
+ "groupId": "apex-group",
"enableAutoCommit": true,
"autoCommitTime": 1000,
"sessionTimeout": 30000,