aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2020-04-02 20:54:52 +0100
committerliamfallon <liam.fallon@est.tech>2020-04-03 19:20:26 +0100
commitf134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c (patch)
treeac4bb39fac4f45637a23a6a72b7e148689f436f1
parent640aaf64a0b28b53a7425c17b9065a46c29d3587 (diff)
Fix failing Kafka tests
All the Kafka components need something near 10 seconds to come up completely. This review tweaks the timing to allow the test Kafka server to come up and to allow the consumers to connect to it. Issue-ID: POLICY-2106 Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789 Signed-off-by: liamfallon <liam.fallon@est.tech>
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java65
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java12
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java10
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java18
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java20
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java72
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java25
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml14
8 files changed, 127 insertions, 109 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 947dd5466..591f83237 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
- // The Kafka consumer used to receive events using Kafka
- private KafkaConsumer<String, String> kafkaConsumer;
-
/**
* {@inheritDoc}.
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
- LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
- }
- kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
-
- // Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
+ kafkaConsumerProperties =
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
}
-
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
- }
+ try (KafkaConsumer<String, String> kafkaConsumer =
+ new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
+ kafkaConsumerProperties.getConsumerTopicList());
+ }
- // The endless loop that receives events over Kafka
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- try {
- final ConsumerRecords<String, String> records = kafkaConsumer
- .poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> record : records) {
- traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(new Properties(), record.value());
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ for (final ConsumerRecord<String, String> record : records) {
+ traceIfTraceEnabled(record);
+ eventReceiver.receiveEvent(new Properties(), record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
- } catch (final Exception e) {
- LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
}
-
- if (!consumerThread.isInterrupted()) {
- kafkaConsumer.close();
- }
}
/**
@@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
index 489489ff0..f70bf580b 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
@@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable {
// Recurring string constants
private static final String WITH_MESSAGE = " with message: ";
private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor ";
+ private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor ";
@Setter(AccessLevel.PROTECTED)
private static TimeUnit timeunit4Latches = TimeUnit.SECONDS;
@@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable {
Thread.currentThread().interrupt();
}
+ if (executorException.get() != null) {
+ executorThread.interrupt();
+ checkAndThrowExecutorException();
+ }
+
checkAndThrowExecutorException();
LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId());
@@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable {
*/
public synchronized boolean execute(final Object executionContext) throws StateMachineException {
if (executorThread == null) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized");
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized");
}
- if (!executorThread.isAlive()) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId()
+ if (!executorThread.isAlive() || executorThread.isInterrupted()) {
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId()
+ " is not running, run cleanUp to clear executor and init to restart executor");
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
index 6ea15fc35..56ebf9972 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
@@ -34,8 +34,12 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
public class JavascriptExecutorTest {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class);
+
private AtomicBoolean concurrentResult = new AtomicBoolean();
@Before
@@ -277,12 +281,14 @@ public class JavascriptExecutorTest {
public void run() {
try {
while (executor.execute("hello")) {
+ LOGGER.debug("test thread running . . .");
// Loop until interrupted
}
- concurrentResult.set(false);
} catch (StateMachineException e) {
- // Do nothing
+ LOGGER.debug("test thread caught exception", e);
}
+ concurrentResult.set(false);
+ LOGGER.debug("test thread exited");
}
}).start();
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
index c214b72a8..66a928677 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ public class KafkaEventProducer implements Runnable {
private final Thread producerThread;
private boolean sendEventsFlag = false;
- private boolean stopFlag = false;
+ private volatile boolean stopFlag = false;
/**
* Instantiates a new kafka event producer.
@@ -63,7 +63,7 @@ public class KafkaEventProducer implements Runnable {
* @param eventInterval the event interval
*/
public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
- final int eventCount, final boolean xmlEvents, final long eventInterval) {
+ final int eventCount, final boolean xmlEvents, final long eventInterval) {
this.topic = topic;
this.sharedKafkaTestResource = sharedKafkaTestResource;
this.eventCount = eventCount;
@@ -80,7 +80,7 @@ public class KafkaEventProducer implements Runnable {
@Override
public void run() {
final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
- .getKafkaProducer(StringSerializer.class, StringSerializer.class);
+ .getKafkaProducer(StringSerializer.class, StringSerializer.class);
while (producerThread.isAlive() && !stopFlag) {
ThreadUtilities.sleep(50);
@@ -108,11 +108,11 @@ public class KafkaEventProducer implements Runnable {
*/
private void sendEventsToTopic(final Producer<String, String> producer) {
LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}",
- KafkaEventProducer.class.getName(), eventCount, xmlEvents);
+ KafkaEventProducer.class.getName(), eventCount, xmlEvents);
for (int i = 0; i < eventCount; i++) {
LOGGER.debug("{} : waiting {} milliseconds before sending next event", KafkaEventProducer.class.getName(),
- eventInterval);
+ eventInterval);
ThreadUtilities.sleep(eventInterval);
String eventString = null;
@@ -124,7 +124,7 @@ public class KafkaEventProducer implements Runnable {
producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
producer.flush();
eventsSentCount++;
- LOGGER.debug("****** Sent event No. {} ******", eventsSentCount);
+ LOGGER.debug("****** Sent event No. {} ******\n{}", eventsSentCount, eventString);
}
LOGGER.debug("{}: completed", KafkaEventProducer.class.getName());
}
@@ -152,4 +152,8 @@ public class KafkaEventProducer implements Runnable {
LOGGER.debug("{} : stopped", KafkaEventProducer.class.getName());
}
+
+ public boolean isAlive() {
+ return producerThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
index 1bb2e8021..68ca87953 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ public class KafkaEventSubscriber implements Runnable {
KafkaConsumer<String, String> consumer;
- Thread subscriberThread;
+ private final Thread subscriberThread;
/**
* Instantiates a new kafka event subscriber.
@@ -61,15 +61,15 @@ public class KafkaEventSubscriber implements Runnable {
* @param sharedKafkaTestResource the kafka server address
* @throws MessagingException the messaging exception
*/
- public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource)
- throws MessagingException {
+ public KafkaEventSubscriber(final String topic,
+ final SharedKafkaTestResource sharedKafkaTestResource) throws MessagingException {
this.topic = topic;
final Properties consumerProperties = new Properties();
consumerProperties.put("group.id", "test");
consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class,
- StringDeserializer.class, consumerProperties);
+ StringDeserializer.class, consumerProperties);
consumer.subscribe(Arrays.asList(topic));
subscriberThread = new Thread(this);
@@ -82,15 +82,15 @@ public class KafkaEventSubscriber implements Runnable {
@Override
public void run() {
LOGGER.debug("{}: receiving events from Kafka server on topic {}", KafkaEventSubscriber.class.getName(),
- topic);
+ topic);
while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
try {
final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION);
for (final ConsumerRecord<String, String> record : records) {
eventsReceivedCount++;
- LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}", eventsReceivedCount,
- record.offset(), record.key());
+ LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}\n{}", eventsReceivedCount,
+ record.offset(), record.key(), record.value());
}
} catch (final Exception e) {
// Thread interrupted
@@ -123,4 +123,8 @@ public class KafkaEventSubscriber implements Runnable {
consumer.close();
LOGGER.debug("{} : stopped", KafkaEventSubscriber.class.getName());
}
+
+ public boolean isAlive() {
+ return subscriberThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
index b35b961e4..c3289887f 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
@@ -21,6 +21,7 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -28,13 +29,11 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.service.engine.main.ApexMain;
import org.onap.policy.common.utils.resources.TextFileUtils;
@@ -57,21 +56,20 @@ public class TestKafka2Kafka {
@ClassRule
public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
- // Start a cluster with 1 brokers.
- .withBrokers(1)
- // Disable topic auto-creation.
- .withBrokerProperty("auto.create.topics.enable", "false");
+ // Start a cluster with 1 brokers.
+ .withBrokers(1)
+ // Disable topic auto-creation.
+ .withBrokerProperty("auto.create.topics.enable", "false");
/**
* Test json kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testJsonKafkaEvents() throws MessagingException, ApexException {
+ public void testJsonKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, false, "json");
}
@@ -79,13 +77,12 @@ public class TestKafka2Kafka {
/**
* Test XML kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testXmlKafkaEvents() throws MessagingException, ApexException {
+ public void testXmlKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, true, "xml");
@@ -97,44 +94,47 @@ public class TestKafka2Kafka {
* @param args the args
* @param xmlEvents the xml events
* @param topicSuffix the topic suffix
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception on errors
*/
- private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
- throws MessagingException, ApexException {
+ private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
final KafkaEventSubscriber subscriber =
- new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+ new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+
+ await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
final ApexMain apexMain = new ApexMain(args);
- ThreadUtilities.sleep(3000);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
- final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
- EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
+ long initWaitEndTIme = System.currentTimeMillis() + 10000;
- producer.sendEvents();
+ await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
- final long testStartTime = System.currentTimeMillis();
+ final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
+ EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
- // Wait for the producer to send all tis events
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && producer.getEventsSentCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ producer.sendEvents();
+
+ // Wait for the producer to send all its events
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
- ThreadUtilities.sleep(3000);
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
apexMain.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
+
subscriber.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
+
producer.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
}
@@ -144,7 +144,7 @@ public class TestKafka2Kafka {
File tempConfigFile = File.createTempFile("Kafka_", ".json");
tempConfigFile.deleteOnExit();
String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
- .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
+ .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
return tempConfigFile.getCanonicalPath();
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
index da93f919d..3d1a8d7bc 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
@@ -24,9 +24,11 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
+
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
+
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,10 +44,9 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This class runs integration tests for taskParameters.
- * Task parameters are read from the ApexConfig, and they can be accessed in task logic.
- * In this case, the taskParameters are used to set values in executionProperties.
- * URL dynamically populated using executionProperties is hit and values get updated in
+ * This class runs integration tests for taskParameters. Task parameters are read from the ApexConfig, and they can be
+ * accessed in task logic. In this case, the taskParameters are used to set values in executionProperties. URL
+ * dynamically populated using executionProperties is hit and values get updated in
* {@link RestClientEndpointForTaskParameters} which acts as a temporary server for requests.
*/
public class TestTaskParameters {
@@ -121,8 +122,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with no taskIds.
- * When taskIds are not provided, all taskParameters provided in config will be updated to all tasks.
+ * Test taskParameters with no taskIds. When taskIds are not provided, all taskParameters provided in config will be
+ * updated to all tasks.
*/
@Test
public void testTaskParameters_with_noTaskIds() throws Exception {
@@ -132,8 +133,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with valid taskIds.
- * When valid taskIds are provided, the the taskParameter will be updated in that particular task alone.
+ * Test taskParameters with valid taskIds. When valid taskIds are provided, the the taskParameter will be updated in
+ * that particular task alone.
*/
@Test
public void testTaskParameters_with_validTaskIds() throws Exception {
@@ -143,9 +144,9 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with invalid taskIds.
- * When invalid taskIds are provided, or when a taskParameter assigned to a particular taskId is tried to be
- * accessed in a taskLogic of a different task, such taskParameters won't be accessible in the task
+ * Test taskParameters with invalid taskIds. When invalid taskIds are provided, or when a taskParameter assigned to
+ * a particular taskId is tried to be accessed in a taskLogic of a different task, such taskParameters won't be
+ * accessible in the task
*/
@Test
public void testTaskParameters_with_invalidTaskIds() throws Exception {
@@ -165,7 +166,7 @@ public class TestTaskParameters {
String getDetailsUrl = "http://" + HOST + ":" + PORT + "/TestTaskParametersRest/apex/event/getDetails";
// wait for success response code to be received, until a timeout
- await().atMost(2000, TimeUnit.MILLISECONDS)
+ await().atMost(5, TimeUnit.SECONDS)
.until(() -> 200 == client.target(getDetailsUrl).request("application/json").get().getStatus());
apexMain.shutdown();
Response response = client.target(getDetailsUrl).request("application/json").get();
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
index fbe9ffc29..341a9fdaa 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
@@ -35,4 +35,18 @@
<appender-ref ref="STDOUT" />
</root>
+
+ <logger name="org.onap.policy.apex" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.core" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.plugins.executor" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+
</configuration>