summaryrefslogtreecommitdiffstats
path: root/testsuites/integration
diff options
context:
space:
mode:
Diffstat (limited to 'testsuites/integration')
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java18
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java20
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java72
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java25
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml14
5 files changed, 86 insertions, 63 deletions
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
index c214b72a8..66a928677 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ public class KafkaEventProducer implements Runnable {
private final Thread producerThread;
private boolean sendEventsFlag = false;
- private boolean stopFlag = false;
+ private volatile boolean stopFlag = false;
/**
* Instantiates a new kafka event producer.
@@ -63,7 +63,7 @@ public class KafkaEventProducer implements Runnable {
* @param eventInterval the event interval
*/
public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
- final int eventCount, final boolean xmlEvents, final long eventInterval) {
+ final int eventCount, final boolean xmlEvents, final long eventInterval) {
this.topic = topic;
this.sharedKafkaTestResource = sharedKafkaTestResource;
this.eventCount = eventCount;
@@ -80,7 +80,7 @@ public class KafkaEventProducer implements Runnable {
@Override
public void run() {
final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
- .getKafkaProducer(StringSerializer.class, StringSerializer.class);
+ .getKafkaProducer(StringSerializer.class, StringSerializer.class);
while (producerThread.isAlive() && !stopFlag) {
ThreadUtilities.sleep(50);
@@ -108,11 +108,11 @@ public class KafkaEventProducer implements Runnable {
*/
private void sendEventsToTopic(final Producer<String, String> producer) {
LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}",
- KafkaEventProducer.class.getName(), eventCount, xmlEvents);
+ KafkaEventProducer.class.getName(), eventCount, xmlEvents);
for (int i = 0; i < eventCount; i++) {
LOGGER.debug("{} : waiting {} milliseconds before sending next event", KafkaEventProducer.class.getName(),
- eventInterval);
+ eventInterval);
ThreadUtilities.sleep(eventInterval);
String eventString = null;
@@ -124,7 +124,7 @@ public class KafkaEventProducer implements Runnable {
producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
producer.flush();
eventsSentCount++;
- LOGGER.debug("****** Sent event No. {} ******", eventsSentCount);
+ LOGGER.debug("****** Sent event No. {} ******\n{}", eventsSentCount, eventString);
}
LOGGER.debug("{}: completed", KafkaEventProducer.class.getName());
}
@@ -152,4 +152,8 @@ public class KafkaEventProducer implements Runnable {
LOGGER.debug("{} : stopped", KafkaEventProducer.class.getName());
}
+
+ public boolean isAlive() {
+ return producerThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
index 1bb2e8021..68ca87953 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ public class KafkaEventSubscriber implements Runnable {
KafkaConsumer<String, String> consumer;
- Thread subscriberThread;
+ private final Thread subscriberThread;
/**
* Instantiates a new kafka event subscriber.
@@ -61,15 +61,15 @@ public class KafkaEventSubscriber implements Runnable {
* @param sharedKafkaTestResource the kafka server address
* @throws MessagingException the messaging exception
*/
- public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource)
- throws MessagingException {
+ public KafkaEventSubscriber(final String topic,
+ final SharedKafkaTestResource sharedKafkaTestResource) throws MessagingException {
this.topic = topic;
final Properties consumerProperties = new Properties();
consumerProperties.put("group.id", "test");
consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class,
- StringDeserializer.class, consumerProperties);
+ StringDeserializer.class, consumerProperties);
consumer.subscribe(Arrays.asList(topic));
subscriberThread = new Thread(this);
@@ -82,15 +82,15 @@ public class KafkaEventSubscriber implements Runnable {
@Override
public void run() {
LOGGER.debug("{}: receiving events from Kafka server on topic {}", KafkaEventSubscriber.class.getName(),
- topic);
+ topic);
while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
try {
final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION);
for (final ConsumerRecord<String, String> record : records) {
eventsReceivedCount++;
- LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}", eventsReceivedCount,
- record.offset(), record.key());
+ LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}\n{}", eventsReceivedCount,
+ record.offset(), record.key(), record.value());
}
} catch (final Exception e) {
// Thread interrupted
@@ -123,4 +123,8 @@ public class KafkaEventSubscriber implements Runnable {
consumer.close();
LOGGER.debug("{} : stopped", KafkaEventSubscriber.class.getName());
}
+
+ public boolean isAlive() {
+ return subscriberThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
index b35b961e4..c3289887f 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
@@ -21,6 +21,7 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -28,13 +29,11 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.service.engine.main.ApexMain;
import org.onap.policy.common.utils.resources.TextFileUtils;
@@ -57,21 +56,20 @@ public class TestKafka2Kafka {
@ClassRule
public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
- // Start a cluster with 1 brokers.
- .withBrokers(1)
- // Disable topic auto-creation.
- .withBrokerProperty("auto.create.topics.enable", "false");
+ // Start a cluster with 1 brokers.
+ .withBrokers(1)
+ // Disable topic auto-creation.
+ .withBrokerProperty("auto.create.topics.enable", "false");
/**
* Test json kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testJsonKafkaEvents() throws MessagingException, ApexException {
+ public void testJsonKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, false, "json");
}
@@ -79,13 +77,12 @@ public class TestKafka2Kafka {
/**
* Test XML kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testXmlKafkaEvents() throws MessagingException, ApexException {
+ public void testXmlKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, true, "xml");
@@ -97,44 +94,47 @@ public class TestKafka2Kafka {
* @param args the args
* @param xmlEvents the xml events
* @param topicSuffix the topic suffix
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception on errors
*/
- private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
- throws MessagingException, ApexException {
+ private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
final KafkaEventSubscriber subscriber =
- new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+ new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+
+ await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
final ApexMain apexMain = new ApexMain(args);
- ThreadUtilities.sleep(3000);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
- final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
- EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
+ long initWaitEndTIme = System.currentTimeMillis() + 10000;
- producer.sendEvents();
+ await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
- final long testStartTime = System.currentTimeMillis();
+ final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
+ EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
- // Wait for the producer to send all tis events
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && producer.getEventsSentCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ producer.sendEvents();
+
+ // Wait for the producer to send all its events
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
- ThreadUtilities.sleep(3000);
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
apexMain.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
+
subscriber.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
+
producer.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
}
@@ -144,7 +144,7 @@ public class TestKafka2Kafka {
File tempConfigFile = File.createTempFile("Kafka_", ".json");
tempConfigFile.deleteOnExit();
String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
- .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
+ .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
return tempConfigFile.getCanonicalPath();
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
index da93f919d..3d1a8d7bc 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
@@ -24,9 +24,11 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
+
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
+
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,10 +44,9 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This class runs integration tests for taskParameters.
- * Task parameters are read from the ApexConfig, and they can be accessed in task logic.
- * In this case, the taskParameters are used to set values in executionProperties.
- * URL dynamically populated using executionProperties is hit and values get updated in
+ * This class runs integration tests for taskParameters. Task parameters are read from the ApexConfig, and they can be
+ * accessed in task logic. In this case, the taskParameters are used to set values in executionProperties. URL
+ * dynamically populated using executionProperties is hit and values get updated in
* {@link RestClientEndpointForTaskParameters} which acts as a temporary server for requests.
*/
public class TestTaskParameters {
@@ -121,8 +122,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with no taskIds.
- * When taskIds are not provided, all taskParameters provided in config will be updated to all tasks.
+ * Test taskParameters with no taskIds. When taskIds are not provided, all taskParameters provided in config will be
+ * updated to all tasks.
*/
@Test
public void testTaskParameters_with_noTaskIds() throws Exception {
@@ -132,8 +133,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with valid taskIds.
- * When valid taskIds are provided, the the taskParameter will be updated in that particular task alone.
+ * Test taskParameters with valid taskIds. When valid taskIds are provided, the the taskParameter will be updated in
+ * that particular task alone.
*/
@Test
public void testTaskParameters_with_validTaskIds() throws Exception {
@@ -143,9 +144,9 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with invalid taskIds.
- * When invalid taskIds are provided, or when a taskParameter assigned to a particular taskId is tried to be
- * accessed in a taskLogic of a different task, such taskParameters won't be accessible in the task
+ * Test taskParameters with invalid taskIds. When invalid taskIds are provided, or when a taskParameter assigned to
+ * a particular taskId is tried to be accessed in a taskLogic of a different task, such taskParameters won't be
+ * accessible in the task
*/
@Test
public void testTaskParameters_with_invalidTaskIds() throws Exception {
@@ -165,7 +166,7 @@ public class TestTaskParameters {
String getDetailsUrl = "http://" + HOST + ":" + PORT + "/TestTaskParametersRest/apex/event/getDetails";
// wait for success response code to be received, until a timeout
- await().atMost(2000, TimeUnit.MILLISECONDS)
+ await().atMost(5, TimeUnit.SECONDS)
.until(() -> 200 == client.target(getDetailsUrl).request("application/json").get().getStatus());
apexMain.shutdown();
Response response = client.target(getDetailsUrl).request("application/json").get();
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
index fbe9ffc29..341a9fdaa 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
@@ -35,4 +35,18 @@
<appender-ref ref="STDOUT" />
</root>
+
+ <logger name="org.onap.policy.apex" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.core" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.plugins.executor" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+
</configuration>