diff options
Diffstat (limited to 'testsuites/integration/integration-uservice-test/src/test')
5 files changed, 86 insertions, 63 deletions
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java index c214b72a8..66a928677 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ public class KafkaEventProducer implements Runnable { private final Thread producerThread; private boolean sendEventsFlag = false; - private boolean stopFlag = false; + private volatile boolean stopFlag = false; /** * Instantiates a new kafka event producer. @@ -63,7 +63,7 @@ public class KafkaEventProducer implements Runnable { * @param eventInterval the event interval */ public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource, - final int eventCount, final boolean xmlEvents, final long eventInterval) { + final int eventCount, final boolean xmlEvents, final long eventInterval) { this.topic = topic; this.sharedKafkaTestResource = sharedKafkaTestResource; this.eventCount = eventCount; @@ -80,7 +80,7 @@ public class KafkaEventProducer implements Runnable { @Override public void run() { final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils() - .getKafkaProducer(StringSerializer.class, StringSerializer.class); + .getKafkaProducer(StringSerializer.class, StringSerializer.class); while (producerThread.isAlive() && !stopFlag) { ThreadUtilities.sleep(50); @@ -108,11 +108,11 @@ public class KafkaEventProducer implements Runnable { */ private void sendEventsToTopic(final Producer<String, String> producer) { LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}", - KafkaEventProducer.class.getName(), eventCount, xmlEvents); + KafkaEventProducer.class.getName(), eventCount, xmlEvents); for (int i = 0; i < eventCount; i++) { LOGGER.debug("{} : waiting {} milliseconds before sending next event", KafkaEventProducer.class.getName(), - eventInterval); + eventInterval); ThreadUtilities.sleep(eventInterval); String eventString = null; @@ -124,7 +124,7 @@ public class KafkaEventProducer implements Runnable { producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString)); producer.flush(); eventsSentCount++; - LOGGER.debug("****** Sent event No. {} ******", eventsSentCount); + LOGGER.debug("****** Sent event No. {} ******\n{}", eventsSentCount, eventString); } LOGGER.debug("{}: completed", KafkaEventProducer.class.getName()); } @@ -152,4 +152,8 @@ public class KafkaEventProducer implements Runnable { LOGGER.debug("{} : stopped", KafkaEventProducer.class.getName()); } + + public boolean isAlive() { + return producerThread.isAlive(); + } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java index 1bb2e8021..68ca87953 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. - * Modifications Copyright (C) 2019 Nordix Foundation. + * Modifications Copyright (C) 2019-2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,7 +52,7 @@ public class KafkaEventSubscriber implements Runnable { KafkaConsumer<String, String> consumer; - Thread subscriberThread; + private final Thread subscriberThread; /** * Instantiates a new kafka event subscriber. @@ -61,15 +61,15 @@ public class KafkaEventSubscriber implements Runnable { * @param sharedKafkaTestResource the kafka server address * @throws MessagingException the messaging exception */ - public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource) - throws MessagingException { + public KafkaEventSubscriber(final String topic, + final SharedKafkaTestResource sharedKafkaTestResource) throws MessagingException { this.topic = topic; final Properties consumerProperties = new Properties(); consumerProperties.put("group.id", "test"); consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class, - StringDeserializer.class, consumerProperties); + StringDeserializer.class, consumerProperties); consumer.subscribe(Arrays.asList(topic)); subscriberThread = new Thread(this); @@ -82,15 +82,15 @@ public class KafkaEventSubscriber implements Runnable { @Override public void run() { LOGGER.debug("{}: receiving events from Kafka server on topic {}", KafkaEventSubscriber.class.getName(), - topic); + topic); while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) { try { final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION); for (final ConsumerRecord<String, String> record : records) { eventsReceivedCount++; - LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}", eventsReceivedCount, - record.offset(), record.key()); + LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}\n{}", eventsReceivedCount, + record.offset(), record.key(), record.value()); } } catch (final Exception e) { // Thread interrupted @@ -123,4 +123,8 @@ public class KafkaEventSubscriber implements Runnable { consumer.close(); LOGGER.debug("{} : stopped", KafkaEventSubscriber.class.getName()); } + + public boolean isAlive() { + return subscriberThread.isAlive(); + } } diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java index b35b961e4..c3289887f 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java @@ -21,6 +21,7 @@ package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -28,13 +29,11 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource; import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; -import org.onap.policy.apex.core.infrastructure.messaging.MessagingException; -import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; -import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.service.engine.main.ApexMain; import org.onap.policy.common.utils.resources.TextFileUtils; @@ -57,21 +56,20 @@ public class TestKafka2Kafka { @ClassRule public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() - // Start a cluster with 1 brokers. - .withBrokers(1) - // Disable topic auto-creation. - .withBrokerProperty("auto.create.topics.enable", "false"); + // Start a cluster with 1 brokers. + .withBrokers(1) + // Disable topic auto-creation. + .withBrokerProperty("auto.create.topics.enable", "false"); /** * Test json kafka events. * - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception the apex exception */ @Test - public void testJsonKafkaEvents() throws MessagingException, ApexException { + public void testJsonKafkaEvents() throws Exception { final String conditionedConfigFile = getConditionedConfigFile( - "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json"); + "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json"); final String[] args = {"-rfr", "target", "-c", conditionedConfigFile}; testKafkaEvents(args, false, "json"); } @@ -79,13 +77,12 @@ public class TestKafka2Kafka { /** * Test XML kafka events. * - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception the apex exception */ @Test - public void testXmlKafkaEvents() throws MessagingException, ApexException { + public void testXmlKafkaEvents() throws Exception { final String conditionedConfigFile = getConditionedConfigFile( - "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json"); + "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json"); final String[] args = {"-rfr", "target", "-c", conditionedConfigFile}; testKafkaEvents(args, true, "xml"); @@ -97,44 +94,47 @@ public class TestKafka2Kafka { * @param args the args * @param xmlEvents the xml events * @param topicSuffix the topic suffix - * @throws MessagingException the messaging exception - * @throws ApexException the apex exception + * @throws Exception on errors */ - private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) - throws MessagingException, ApexException { + private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception { sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1); sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1); final KafkaEventSubscriber subscriber = - new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource); + new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource); + + await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive()); final ApexMain apexMain = new ApexMain(args); - ThreadUtilities.sleep(3000); + await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive()); - final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource, - EVENT_COUNT, xmlEvents, EVENT_INTERVAL); + long initWaitEndTIme = System.currentTimeMillis() + 10000; - producer.sendEvents(); + await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis()); - final long testStartTime = System.currentTimeMillis(); + final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource, + EVENT_COUNT, xmlEvents, EVENT_INTERVAL); - // Wait for the producer to send all tis events - while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && producer.getEventsSentCount() < EVENT_COUNT) { - ThreadUtilities.sleep(EVENT_INTERVAL); - } + await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive()); - while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH - && subscriber.getEventsReceivedCount() < EVENT_COUNT) { - ThreadUtilities.sleep(EVENT_INTERVAL); - } + producer.sendEvents(); + + // Wait for the producer to send all its events + await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS) + .until(() -> producer.getEventsSentCount() >= EVENT_COUNT); - ThreadUtilities.sleep(3000); + await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS) + .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT); apexMain.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive()); + subscriber.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive()); + producer.shutdown(); + await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive()); assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount()); } @@ -144,7 +144,7 @@ public class TestKafka2Kafka { File tempConfigFile = File.createTempFile("Kafka_", ".json"); tempConfigFile.deleteOnExit(); String configAsString = TextFileUtils.getTextFileAsString(configurationFileName) - .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString()); + .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString()); TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile()); return tempConfigFile.getCanonicalPath(); diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java index da93f919d..3d1a8d7bc 100644 --- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java +++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java @@ -24,9 +24,11 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; + import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.core.Response; + import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -42,10 +44,9 @@ import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; /** - * This class runs integration tests for taskParameters. - * Task parameters are read from the ApexConfig, and they can be accessed in task logic. - * In this case, the taskParameters are used to set values in executionProperties. - * URL dynamically populated using executionProperties is hit and values get updated in + * This class runs integration tests for taskParameters. Task parameters are read from the ApexConfig, and they can be + * accessed in task logic. In this case, the taskParameters are used to set values in executionProperties. URL + * dynamically populated using executionProperties is hit and values get updated in * {@link RestClientEndpointForTaskParameters} which acts as a temporary server for requests. */ public class TestTaskParameters { @@ -121,8 +122,8 @@ public class TestTaskParameters { } /** - * Test taskParameters with no taskIds. - * When taskIds are not provided, all taskParameters provided in config will be updated to all tasks. + * Test taskParameters with no taskIds. When taskIds are not provided, all taskParameters provided in config will be + * updated to all tasks. */ @Test public void testTaskParameters_with_noTaskIds() throws Exception { @@ -132,8 +133,8 @@ public class TestTaskParameters { } /** - * Test taskParameters with valid taskIds. - * When valid taskIds are provided, the the taskParameter will be updated in that particular task alone. + * Test taskParameters with valid taskIds. When valid taskIds are provided, the the taskParameter will be updated in + * that particular task alone. */ @Test public void testTaskParameters_with_validTaskIds() throws Exception { @@ -143,9 +144,9 @@ public class TestTaskParameters { } /** - * Test taskParameters with invalid taskIds. - * When invalid taskIds are provided, or when a taskParameter assigned to a particular taskId is tried to be - * accessed in a taskLogic of a different task, such taskParameters won't be accessible in the task + * Test taskParameters with invalid taskIds. When invalid taskIds are provided, or when a taskParameter assigned to + * a particular taskId is tried to be accessed in a taskLogic of a different task, such taskParameters won't be + * accessible in the task */ @Test public void testTaskParameters_with_invalidTaskIds() throws Exception { @@ -165,7 +166,7 @@ public class TestTaskParameters { String getDetailsUrl = "http://" + HOST + ":" + PORT + "/TestTaskParametersRest/apex/event/getDetails"; // wait for success response code to be received, until a timeout - await().atMost(2000, TimeUnit.MILLISECONDS) + await().atMost(5, TimeUnit.SECONDS) .until(() -> 200 == client.target(getDetailsUrl).request("application/json").get().getStatus()); apexMain.shutdown(); Response response = client.target(getDetailsUrl).request("application/json").get(); diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml index fbe9ffc29..341a9fdaa 100644 --- a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml +++ b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml @@ -35,4 +35,18 @@ <appender-ref ref="STDOUT" /> </root> + + <logger name="org.onap.policy.apex" level="INFO" additivity="false"> + <appender-ref ref="STDOUT" /> + </logger> + + <logger name="org.onap.policy.apex.core" level="INFO" additivity="false"> + <appender-ref ref="STDOUT" /> + </logger> + + <logger name="org.onap.policy.apex.plugins.executor" level="INFO" additivity="false"> + <appender-ref ref="STDOUT" /> + </logger> + + </configuration> |