From e13ff2c6faf63caab2d47fa63777e965e32ec642 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Fri, 14 Sep 2018 23:05:56 +0100 Subject: Re-implement Kafka tests that periodically fail The Kafla integration tests fail, this change re-implements the tests using a test framework from salesforce.com i Issue-ID: POLICY-1034 Change-Id: Iffcc9e0a9f419c8ec439771be7a7a58faa2f9860 Signed-off-by: liamfallon --- .../uservice/adapt/kafka/KafkaEventProducer.java | 82 +++--------- .../uservice/adapt/kafka/KafkaEventSubscriber.java | 62 ++++----- .../uservice/adapt/kafka/TestKafka2Kafka.java | 142 +++++++-------------- 3 files changed, 90 insertions(+), 196 deletions(-) (limited to 'testsuites/integration/integration-uservice-test/src/test/java') diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java index 072d67864..63da4e696 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java @@ -20,12 +20,13 @@ package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka; -import java.util.Properties; +import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; + import java.util.concurrent.TimeUnit; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGenerator; @@ -36,7 +37,7 @@ import org.onap.policy.apex.testsuites.integration.uservice.adapt.events.EventGe */ public class KafkaEventProducer implements Runnable { private final String topic; - private final String kafkaServerAddress; + private final SharedKafkaTestResource sharedKafkaTestResource; private final int eventCount; private final boolean xmlEvents; private final long eventInterval; @@ -50,15 +51,15 @@ public class KafkaEventProducer implements Runnable { * Instantiates a new kafka event producer. * * @param topic the topic - * @param kafkaServerAddress the kafka server address + * @param sharedKafkaTestResource the kafka server address * @param eventCount the event count * @param xmlEvents the xml events * @param eventInterval the event interval */ - public KafkaEventProducer(final String topic, final String kafkaServerAddress, final int eventCount, - final boolean xmlEvents, final long eventInterval) { + public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource, + final int eventCount, final boolean xmlEvents, final long eventInterval) { this.topic = topic; - this.kafkaServerAddress = kafkaServerAddress; + this.sharedKafkaTestResource = sharedKafkaTestResource; this.eventCount = eventCount; this.xmlEvents = xmlEvents; this.eventInterval = eventInterval; @@ -67,22 +68,15 @@ public class KafkaEventProducer implements Runnable { producerThread.start(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see java.lang.Runnable#run() */ @Override public void run() { - final Properties kafkaProducerProperties = new Properties(); - kafkaProducerProperties.put("bootstrap.servers", kafkaServerAddress); - kafkaProducerProperties.put("acks", "all"); - kafkaProducerProperties.put("retries", 0); - kafkaProducerProperties.put("batch.size", 16384); - kafkaProducerProperties.put("linger.ms", 1); - kafkaProducerProperties.put("buffer.memory", 33554432); - kafkaProducerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - kafkaProducerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - final Producer producer = new KafkaProducer(kafkaProducerProperties); + final Producer producer = sharedKafkaTestResource.getKafkaTestUtils() + .getKafkaProducer(StringSerializer.class, StringSerializer.class); while (producerThread.isAlive() && !stopFlag) { ThreadUtilities.sleep(50); @@ -109,8 +103,8 @@ public class KafkaEventProducer implements Runnable { * @param producer the producer */ private void sendEventsToTopic(final Producer producer) { - System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sending events to Kafka server at " - + kafkaServerAddress + ", event count " + eventCount + ", xmlEvents " + xmlEvents); + System.out.println(KafkaEventProducer.class.getCanonicalName() + + ": sending events to Kafka server , event count " + eventCount + ", xmlEvents " + xmlEvents); for (int i = 0; i < eventCount; i++) { System.out.println(KafkaEventProducer.class.getCanonicalName() + ": waiting " + eventInterval @@ -126,7 +120,7 @@ public class KafkaEventProducer implements Runnable { producer.send(new ProducerRecord(topic, "Event" + i + "Of" + eventCount, eventString)); producer.flush(); eventsSentCount++; - System.out.println(KafkaEventProducer.class.getCanonicalName() + ": sent event " + eventString); + System.out.println("****** Sent event No. " + eventsSentCount + " ******"); } System.out.println(KafkaEventProducer.class.getCanonicalName() + ": completed"); } @@ -154,48 +148,4 @@ public class KafkaEventProducer implements Runnable { System.out.println(KafkaEventProducer.class.getCanonicalName() + ": stopped"); } - - /** - * The main method. - * - * @param args the arguments - */ - public static void main(final String[] args) { - if (args.length != 5) { - System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval"); - return; - } - - int eventCount = 0; - try { - eventCount = Integer.parseInt(args[2]); - } catch (final Exception e) { - System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval"); - e.printStackTrace(); - return; - } - - long eventInterval = 0; - try { - eventInterval = Long.parseLong(args[4]); - } catch (final Exception e) { - System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval"); - e.printStackTrace(); - return; - } - - boolean xmlEvents = false; - if (args[3].equalsIgnoreCase("XML")) { - xmlEvents = true; - } else if (!args[3].equalsIgnoreCase("JSON")) { - System.err.println("usage KafkaEventProducer topic kafkaServerAddress #events XML|JSON eventInterval"); - return; - } - - final KafkaEventProducer producer = new KafkaEventProducer(args[0], args[1], eventCount, xmlEvents, - eventInterval); - - producer.sendEvents(); - producer.shutdown(); - } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java index 4b6a62e28..1c33289b7 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java @@ -20,12 +20,16 @@ package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka; +import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; + +import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; @@ -35,8 +39,9 @@ import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; * @author Liam Fallon (liam.fallon@ericsson.com) */ public class KafkaEventSubscriber implements Runnable { + private static final Duration POLL_DURATION = Duration.ofMillis(100); + private final String topic; - private final String kafkaServerAddress; private long eventsReceivedCount = 0; KafkaConsumer consumer; @@ -47,46 +52,44 @@ public class KafkaEventSubscriber implements Runnable { * Instantiates a new kafka event subscriber. * * @param topic the topic - * @param kafkaServerAddress the kafka server address + * @param sharedKafkaTestResource the kafka server address * @throws MessagingException the messaging exception */ - public KafkaEventSubscriber(final String topic, final String kafkaServerAddress) throws MessagingException { + public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource) + throws MessagingException { this.topic = topic; - this.kafkaServerAddress = kafkaServerAddress; - - final Properties props = new Properties(); - props.put("bootstrap.servers", kafkaServerAddress); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "1000"); - props.put("session.timeout.ms", "30000"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - consumer = new KafkaConsumer(props); + + + final Properties consumerProperties = new Properties(); + consumerProperties.put("group.id", "test"); + + + consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class, + StringDeserializer.class, consumerProperties); consumer.subscribe(Arrays.asList(topic)); subscriberThread = new Thread(this); subscriberThread.start(); } - /* (non-Javadoc) + /* + * (non-Javadoc) + * * @see java.lang.Runnable#run() */ @Override public void run() { - System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": receiving events from Kafka server at " - + kafkaServerAddress + " on topic " + topic); + System.out.println(KafkaEventSubscriber.class.getCanonicalName() + + ": receiving events from Kafka server on topic " + topic); while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) { try { - final ConsumerRecords records = consumer.poll(100); + final ConsumerRecords records = consumer.poll(POLL_DURATION); for (final ConsumerRecord record : records) { - System.out.println("******"); + eventsReceivedCount++; + System.out.println("****** Received event No. " + eventsReceivedCount + " ******"); System.out.println("offset=" + record.offset()); System.out.println("key=" + record.key()); - System.out.println("name=" + record.value()); - eventsReceivedCount++; } } catch (final Exception e) { // Thread interrupted @@ -119,19 +122,4 @@ public class KafkaEventSubscriber implements Runnable { consumer.close(); System.out.println(KafkaEventSubscriber.class.getCanonicalName() + ": stopped"); } - - - /** - * The main method. - * - * @param args the arguments - * @throws MessagingException the messaging exception - */ - public static void main(final String[] args) throws MessagingException { - if (args.length != 2) { - System.err.println("usage KafkaEventSubscriber topic kafkaServerAddress"); - return; - } - new KafkaEventSubscriber(args[0], args[1]); - } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java index e70a597c2..6ab910506 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java @@ -21,104 +21,36 @@ package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; + +import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.util.Properties; - -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.TestUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; - -import org.I0Itec.zkclient.ZkClient; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.junit.AfterClass; -import org.junit.BeforeClass; + +import org.junit.ClassRule; import org.junit.Test; import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.utilities.TextFileUtils; import org.onap.policy.apex.service.engine.main.ApexMain; - /** - * The Class TestKafka2Kafka. + * The Class TestKafka2Kafka tests Kafka event sending and reception. */ public class TestKafka2Kafka { - // The method of starting an embedded Kafka server used in this example is based on the method - // on stack overflow at - // https://github.com/asmaier/mini-kafka - - private static final long MAX_TEST_LENGTH = 20000; + private static final long MAX_TEST_LENGTH = 60000; - private static final int EVENT_COUNT = 10; + private static final int EVENT_COUNT = 100; private static final int EVENT_INTERVAL = 20; - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "39902"; - - private static EmbeddedZookeeper zkServer; - private static ZkClient zkClient; - private static KafkaServer kafkaServer; - - /** - * Setup dummy kafka server. - * - * @throws IOException Signals that an I/O exception has occurred. - */ - @BeforeClass - public static void setupDummyKafkaServer() throws IOException { - // setup Zookeeper - zkServer = new EmbeddedZookeeper(); - final String zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - final ZkUtils zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - final Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - brokerProps.setProperty("offsets.topic.replication.factor", "1"); - brokerProps.setProperty("transaction.state.log.replication.factor", "1"); - brokerProps.setProperty("transaction.state.log.min.isr", "1"); - final KafkaConfig config = new KafkaConfig(brokerProps); - final Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - kafkaServer.startup(); - - // create topics - AdminUtils.createTopic(zkUtils, "apex-in-0", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - AdminUtils.createTopic(zkUtils, "apex-in-1", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - AdminUtils.createTopic(zkUtils, "apex-out", 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - - } - - /** - * Shutdown dummy kafka server. - * - * @throws IOException Signals that an I/O exception has occurred. - */ - @AfterClass - public static void shutdownDummyKafkaServer() throws IOException { - if (kafkaServer != null) { - kafkaServer.shutdown(); - } - if (zkClient != null) { - zkClient.close(); - } - if (zkServer != null) { - zkServer.shutdown(); - } - } + @ClassRule + public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + // Start a cluster with 1 brokers. + .withBrokers(1) + // Disable topic auto-creation. + .withBrokerProperty("auto.create.topics.enable", "false"); /** * Test json kafka events. @@ -128,7 +60,8 @@ public class TestKafka2Kafka { */ @Test public void testJsonKafkaEvents() throws MessagingException, ApexException { - final String[] args = {"src/test/resources/prodcons/Kafka2KafkaJsonEvent.json"}; + final String[] args = + { "src/test/resources/prodcons/Kafka2KafkaJsonEvent.json" }; testKafkaEvents(args, false, "json"); } @@ -140,7 +73,8 @@ public class TestKafka2Kafka { */ @Test public void testXmlKafkaEvents() throws MessagingException, ApexException { - final String[] args = {"src/test/resources/prodcons/Kafka2KafkaXmlEvent.json"}; + final String[] args = + { "src/test/resources/prodcons/Kafka2KafkaXmlEvent.json" }; testKafkaEvents(args, true, "xml"); } @@ -153,23 +87,45 @@ public class TestKafka2Kafka { * @throws MessagingException the messaging exception * @throws ApexException the apex exception */ - private void testKafkaEvents(final String[] args, final Boolean xmlEvents, final String topicSuffix) - throws MessagingException, ApexException { - final KafkaEventSubscriber subscriber = - new KafkaEventSubscriber("apex-out-" + topicSuffix, "localhost:" + BROKERPORT); + private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) + throws MessagingException, ApexException { + + try { + File tempConfigFile = File.createTempFile("Kafka_", ".json"); + tempConfigFile.deleteOnExit(); + String configAsString = TextFileUtils.getTextFileAsString(args[0]).replaceAll("localhost:39902", + sharedKafkaTestResource.getKafkaConnectString()); + TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile()); + args[0] = tempConfigFile.getCanonicalPath(); + + } catch (IOException e) { + fail("test should not throw an exception"); + } + + sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1); + sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1); + + final KafkaEventSubscriber subscriber = new KafkaEventSubscriber("apex-out-" + topicSuffix, + sharedKafkaTestResource); final ApexMain apexMain = new ApexMain(args); ThreadUtilities.sleep(3000); - final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, "localhost:" + BROKERPORT, + final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource, EVENT_COUNT, xmlEvents, EVENT_INTERVAL); producer.sendEvents(); - + final long testStartTime = System.currentTimeMillis(); + // Wait for the producer to send all tis events + while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH + && producer.getEventsSentCount() < EVENT_COUNT) { + ThreadUtilities.sleep(EVENT_INTERVAL); + } + while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && subscriber.getEventsReceivedCount() < EVENT_COUNT) { + && subscriber.getEventsReceivedCount() < EVENT_COUNT) { ThreadUtilities.sleep(EVENT_INTERVAL); } -- cgit 1.2.3-korg