From f134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c Mon Sep 17 00:00:00 2001 From: liamfallon Date: Thu, 2 Apr 2020 20:54:52 +0100 Subject: Fix failing Kafka tests All the Kafka components need something near 10 seconds to come up completely. This review tweaks the timing to allow the test Kafka server to come up and to allow the consumers to connect to it. Issue-ID: POLICY-2106 Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789 Signed-off-by: liamfallon --- .../uservice/adapt/kafka/KafkaEventProducer.java | 18 +++--- .../uservice/adapt/kafka/KafkaEventSubscriber.java | 20 +++--- .../uservice/adapt/kafka/TestKafka2Kafka.java | 72 +++++++++++----------- .../taskparameters/TestTaskParameters.java | 25 ++++---- 4 files changed, 72 insertions(+), 63 deletions(-) (limited to 'testsuites/integration/integration-uservice-test/src/test/java') diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java index c214b72a8..66a928677 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ public class KafkaEventProducer implements Runnable { private final Thread producerThread; private boolean sendEventsFlag = false; - private boolean stopFlag = false; + private volatile boolean stopFlag = false; /** * Instantiates a new kafka event producer. @@ -63,7 +63,7 @@ public class KafkaEventProducer implements Runnable { * @param eventInterval the event interval */ public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource, - final int eventCount, final boolean xmlEvents, final long eventInterval) { + final int eventCount, final boolean xmlEvents, final long eventInterval) { this.topic = topic; this.sharedKafkaTestResource = sharedKafkaTestResource; this.eventCount = eventCount; @@ -80,7 +80,7 @@ public class KafkaEventProducer implements Runnable { @Override public void run() { final Producer producer = sharedKafkaTestResource.getKafkaTestUtils() - .getKafkaProducer(StringSerializer.class, StringSerializer.class); + .getKafkaProducer(StringSerializer.class, StringSerializer.class); while (producerThread.isAlive() && !stopFlag) { ThreadUtilities.sleep(50); @@ -108,11 +108,11 @@ public class KafkaEventProducer implements Runnable { */ private void sendEventsToTopic(final Producer producer) { LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}", - KafkaEventProducer.class.getName(), eventCount, xmlEvents); + KafkaEventProducer.class.getName(), eventCount, xmlEvents); for (int i = 0; i < eventCount; i++) { LOGGER.debug("{} : waiting {} milliseconds before sending next event", KafkaEventProducer.class.getName(), - eventInterval); + eventInterval); ThreadUtilities.sleep(eventInterval); String eventString = null; @@ -124,7 +124,7 @@ public class KafkaEventProducer implements Runnable { producer.send(new ProducerRecord(topic, "Event" + i + "Of" + eventCount, eventString)); producer.flush(); eventsSentCount++; - LOGGER.debug("****** Sent event No. {} ******", eventsSentCount); + LOGGER.debug("****** Sent event No. {} ******\n{}", eventsSentCount, eventString); } LOGGER.debug("{}: completed", KafkaEventProducer.class.getName()); } @@ -152,4 +152,8 @@ public class KafkaEventProducer implements Runnable { LOGGER.debug("{} : stopped", KafkaEventProducer.class.getName()); } + + public boolean isAlive() { + return producerThread.isAlive(); + } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java index 1bb2e8021..68ca87953 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ public class KafkaEventSubscriber implements Runnable { KafkaConsumer consumer; - Thread subscriberThread; + private final Thread subscriberThread; /** * Instantiates a new kafka event subscriber. @@ -61,15 +61,15 @@ public class KafkaEventSubscriber implements Runnable { * @param sharedKafkaTestResource the kafka server address * @throws MessagingException the messaging exception */ - public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource) - throws MessagingException { + public KafkaEventSubscriber(final String topic, + final SharedKafkaTestResource sharedKafkaTestResource) throws MessagingException { this.topic = topic; final Properties consumerProperties = new Properties(); consumerProperties.put("group.id", "test"); consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class, - StringDeserializer.class, consumerProperties); + StringDeserializer.class, consumerProperties); consumer.subscribe(Arrays.asList(topic)); subscriberThread = new Thread(this); @@ -82,15 +82,15 @@ public class KafkaEventSubscriber implements Runnable { @Override public void run() { LOGGER.debug("{}: receiving events from Kafka server on topic {}", KafkaEventSubscriber.class.getName(), - topic); + topic); while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) { try { final ConsumerRecords records = consumer.poll(POLL_DURATION); for (final ConsumerRecord record : records) { eventsReceivedCount++; - LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}", eventsReceivedCount, - record.offset(), record.key()); + LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}\n{}", eventsReceivedCount, + record.offset(), record.key(), record.value()); } } catch (final Exception e) { // Thread interrupted @@ -123,4 +123,8 @@ public class KafkaEventSubscriber implements Runnable { consumer.close(); LOGGER.debug("{} : stopped", KafkaEventSubscriber.class.getName()); } + + public boolean isAlive() { + return subscriberThread.isAlive(); + } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java index b35b961e4..c3289887f 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java @@ -21,6 +21,7 @@ package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -28,13 +29,11 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; -import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.service.engine.main.ApexMain; import org.onap.policy.common.utils.resources.TextFileUtils; @@ -57,21 +56,20 @@ public class TestKafka2Kafka { @ClassRule public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() - // Start a cluster with 1 brokers. - .withBrokers(1) - // Disable topic auto-creation. - .withBrokerProperty("auto.create.topics.enable", "false"); + // Start a cluster with 1 brokers. + .withBrokers(1) + // Disable topic auto-creation. + .withBrokerProperty("auto.create.topics.enable", "false"); /** * Test json kafka events. * - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception the apex exception */ @Test - public void testJsonKafkaEvents() throws MessagingException, ApexException { + public void testJsonKafkaEvents() throws Exception { final String conditionedConfigFile = getConditionedConfigFile( - "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json"); + "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json"); final String[] args = {"-rfr", "target", "-c", conditionedConfigFile}; testKafkaEvents(args, false, "json"); } @@ -79,13 +77,12 @@ public class TestKafka2Kafka { /** * Test XML kafka events. * - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception the apex exception */ @Test - public void testXmlKafkaEvents() throws MessagingException, ApexException { + public void testXmlKafkaEvents() throws Exception { final String conditionedConfigFile = getConditionedConfigFile( - "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json"); + "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json"); final String[] args = {"-rfr", "target", "-c", conditionedConfigFile}; testKafkaEvents(args, true, "xml"); @@ -97,44 +94,47 @@ public class TestKafka2Kafka { * @param args the args * @param xmlEvents the xml events * @param topicSuffix the topic suffix - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception on errors */ - private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) - throws MessagingException, ApexException { + private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception { sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1); sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1); final KafkaEventSubscriber subscriber = - new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource); + new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource); + + await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive()); final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(3000); + await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource, - EVENT_COUNT, xmlEvents, EVENT_INTERVAL); + long initWaitEndTIme = System.currentTimeMillis() + 10000; - producer.sendEvents(); + await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis()); - final long testStartTime = System.currentTimeMillis(); + final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource, + EVENT_COUNT, xmlEvents, EVENT_INTERVAL); - // Wait for the producer to send all tis events - while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && producer.getEventsSentCount() < EVENT_COUNT) { - ThreadUtilities.sleep(EVENT_INTERVAL); - } + await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive()); - while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && subscriber.getEventsReceivedCount() < EVENT_COUNT) { - ThreadUtilities.sleep(EVENT_INTERVAL); - } + producer.sendEvents(); + + // Wait for the producer to send all its events + await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS) + .until(() -> producer.getEventsSentCount() >= EVENT_COUNT); - ThreadUtilities.sleep(3000); + await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS) + .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT); apexMain.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); + subscriber.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive()); + producer.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive()); assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount()); } @@ -144,7 +144,7 @@ public class TestKafka2Kafka { File tempConfigFile = File.createTempFile("Kafka_", ".json"); tempConfigFile.deleteOnExit(); String configAsString = TextFileUtils.getTextFileAsString(configurationFileName) - .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString()); + .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString()); TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile()); return tempConfigFile.getCanonicalPath(); diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java index da93f919d..3d1a8d7bc 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java @@ -24,9 +24,11 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; + import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; + import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -42,10 +44,9 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * This class runs integration tests for taskParameters. - * Task parameters are read from the ApexConfig, and they can be accessed in task logic. - * In this case, the taskParameters are used to set values in executionProperties. - * URL dynamically populated using executionProperties is hit and values get updated in + * This class runs integration tests for taskParameters. Task parameters are read from the ApexConfig, and they can be + * accessed in task logic. In this case, the taskParameters are used to set values in executionProperties. URL + * dynamically populated using executionProperties is hit and values get updated in * {@link RestClientEndpointForTaskParameters} which acts as a temporary server for requests. */ public class TestTaskParameters { @@ -121,8 +122,8 @@ public class TestTaskParameters { } /** - * Test taskParameters with no taskIds. - * When taskIds are not provided, all taskParameters provided in config will be updated to all tasks. + * Test taskParameters with no taskIds. When taskIds are not provided, all taskParameters provided in config will be + * updated to all tasks. */ @Test public void testTaskParameters_with_noTaskIds() throws Exception { @@ -132,8 +133,8 @@ public class TestTaskParameters { } /** - * Test taskParameters with valid taskIds. - * When valid taskIds are provided, the the taskParameter will be updated in that particular task alone. + * Test taskParameters with valid taskIds. When valid taskIds are provided, the the taskParameter will be updated in + * that particular task alone. */ @Test public void testTaskParameters_with_validTaskIds() throws Exception { @@ -143,9 +144,9 @@ public class TestTaskParameters { } /** - * Test taskParameters with invalid taskIds. - * When invalid taskIds are provided, or when a taskParameter assigned to a particular taskId is tried to be - * accessed in a taskLogic of a different task, such taskParameters won't be accessible in the task + * Test taskParameters with invalid taskIds. When invalid taskIds are provided, or when a taskParameter assigned to + * a particular taskId is tried to be accessed in a taskLogic of a different task, such taskParameters won't be + * accessible in the task */ @Test public void testTaskParameters_with_invalidTaskIds() throws Exception { @@ -165,7 +166,7 @@ public class TestTaskParameters { String getDetailsUrl = "http://" + HOST + ":" + PORT + "/TestTaskParametersRest/apex/event/getDetails"; // wait for success response code to be received, until a timeout - await().atMost(2000, TimeUnit.MILLISECONDS) + await().atMost(5, TimeUnit.SECONDS) .until(() -> 200 == client.target(getDetailsUrl).request("application/json").get().getStatus()); apexMain.shutdown(); Response response = client.target(getDetailsUrl).request("application/json").get(); -- cgit 1.2.3-korg