aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java65
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java12
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java10
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java18
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java20
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java72
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java25
-rw-r--r--testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml14
8 files changed, 127 insertions, 109 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 947dd5466..591f83237 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
- // The Kafka consumer used to receive events using Kafka
- private KafkaConsumer<String, String> kafkaConsumer;
-
/**
* {@inheritDoc}.
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
- LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
- }
- kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
-
- // Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
+ kafkaConsumerProperties =
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
}
-
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
- }
+ try (KafkaConsumer<String, String> kafkaConsumer =
+ new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
+ kafkaConsumerProperties.getConsumerTopicList());
+ }
- // The endless loop that receives events over Kafka
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- try {
- final ConsumerRecords<String, String> records = kafkaConsumer
- .poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> record : records) {
- traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(new Properties(), record.value());
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ for (final ConsumerRecord<String, String> record : records) {
+ traceIfTraceEnabled(record);
+ eventReceiver.receiveEvent(new Properties(), record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
- } catch (final Exception e) {
- LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
}
-
- if (!consumerThread.isInterrupted()) {
- kafkaConsumer.close();
- }
}
/**
@@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
index 489489ff0..f70bf580b 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
@@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable {
// Recurring string constants
private static final String WITH_MESSAGE = " with message: ";
private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor ";
+ private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor ";
@Setter(AccessLevel.PROTECTED)
private static TimeUnit timeunit4Latches = TimeUnit.SECONDS;
@@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable {
Thread.currentThread().interrupt();
}
+ if (executorException.get() != null) {
+ executorThread.interrupt();
+ checkAndThrowExecutorException();
+ }
+
checkAndThrowExecutorException();
LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId());
@@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable {
*/
public synchronized boolean execute(final Object executionContext) throws StateMachineException {
if (executorThread == null) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized");
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized");
}
- if (!executorThread.isAlive()) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId()
+ if (!executorThread.isAlive() || executorThread.isInterrupted()) {
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId()
+ " is not running, run cleanUp to clear executor and init to restart executor");
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
index 6ea15fc35..56ebf9972 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
@@ -34,8 +34,12 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
public class JavascriptExecutorTest {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class);
+
private AtomicBoolean concurrentResult = new AtomicBoolean();
@Before
@@ -277,12 +281,14 @@ public class JavascriptExecutorTest {
public void run() {
try {
while (executor.execute("hello")) {
+ LOGGER.debug("test thread running . . .");
// Loop until interrupted
}
- concurrentResult.set(false);
} catch (StateMachineException e) {
- // Do nothing
+ LOGGER.debug("test thread caught exception", e);
}
+ concurrentResult.set(false);
+ LOGGER.debug("test thread exited");
}
}).start();
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
index c214b72a8..66a928677 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventProducer.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ public class KafkaEventProducer implements Runnable {
private final Thread producerThread;
private boolean sendEventsFlag = false;
- private boolean stopFlag = false;
+ private volatile boolean stopFlag = false;
/**
* Instantiates a new kafka event producer.
@@ -63,7 +63,7 @@ public class KafkaEventProducer implements Runnable {
* @param eventInterval the event interval
*/
public KafkaEventProducer(final String topic, final SharedKafkaTestResource sharedKafkaTestResource,
- final int eventCount, final boolean xmlEvents, final long eventInterval) {
+ final int eventCount, final boolean xmlEvents, final long eventInterval) {
this.topic = topic;
this.sharedKafkaTestResource = sharedKafkaTestResource;
this.eventCount = eventCount;
@@ -80,7 +80,7 @@ public class KafkaEventProducer implements Runnable {
@Override
public void run() {
final Producer<String, String> producer = sharedKafkaTestResource.getKafkaTestUtils()
- .getKafkaProducer(StringSerializer.class, StringSerializer.class);
+ .getKafkaProducer(StringSerializer.class, StringSerializer.class);
while (producerThread.isAlive() && !stopFlag) {
ThreadUtilities.sleep(50);
@@ -108,11 +108,11 @@ public class KafkaEventProducer implements Runnable {
*/
private void sendEventsToTopic(final Producer<String, String> producer) {
LOGGER.debug("{} : sending events to Kafka server, event count {}, xmlEvents {}",
- KafkaEventProducer.class.getName(), eventCount, xmlEvents);
+ KafkaEventProducer.class.getName(), eventCount, xmlEvents);
for (int i = 0; i < eventCount; i++) {
LOGGER.debug("{} : waiting {} milliseconds before sending next event", KafkaEventProducer.class.getName(),
- eventInterval);
+ eventInterval);
ThreadUtilities.sleep(eventInterval);
String eventString = null;
@@ -124,7 +124,7 @@ public class KafkaEventProducer implements Runnable {
producer.send(new ProducerRecord<String, String>(topic, "Event" + i + "Of" + eventCount, eventString));
producer.flush();
eventsSentCount++;
- LOGGER.debug("****** Sent event No. {} ******", eventsSentCount);
+ LOGGER.debug("****** Sent event No. {} ******\n{}", eventsSentCount, eventString);
}
LOGGER.debug("{}: completed", KafkaEventProducer.class.getName());
}
@@ -152,4 +152,8 @@ public class KafkaEventProducer implements Runnable {
LOGGER.debug("{} : stopped", KafkaEventProducer.class.getName());
}
+
+ public boolean isAlive() {
+ return producerThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
index 1bb2e8021..68ca87953 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/KafkaEventSubscriber.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019-2020 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -52,7 +52,7 @@ public class KafkaEventSubscriber implements Runnable {
KafkaConsumer<String, String> consumer;
- Thread subscriberThread;
+ private final Thread subscriberThread;
/**
* Instantiates a new kafka event subscriber.
@@ -61,15 +61,15 @@ public class KafkaEventSubscriber implements Runnable {
* @param sharedKafkaTestResource the kafka server address
* @throws MessagingException the messaging exception
*/
- public KafkaEventSubscriber(final String topic, final SharedKafkaTestResource sharedKafkaTestResource)
- throws MessagingException {
+ public KafkaEventSubscriber(final String topic,
+ final SharedKafkaTestResource sharedKafkaTestResource) throws MessagingException {
this.topic = topic;
final Properties consumerProperties = new Properties();
consumerProperties.put("group.id", "test");
consumer = sharedKafkaTestResource.getKafkaTestUtils().getKafkaConsumer(StringDeserializer.class,
- StringDeserializer.class, consumerProperties);
+ StringDeserializer.class, consumerProperties);
consumer.subscribe(Arrays.asList(topic));
subscriberThread = new Thread(this);
@@ -82,15 +82,15 @@ public class KafkaEventSubscriber implements Runnable {
@Override
public void run() {
LOGGER.debug("{}: receiving events from Kafka server on topic {}", KafkaEventSubscriber.class.getName(),
- topic);
+ topic);
while (subscriberThread.isAlive() && !subscriberThread.isInterrupted()) {
try {
final ConsumerRecords<String, String> records = consumer.poll(POLL_DURATION);
for (final ConsumerRecord<String, String> record : records) {
eventsReceivedCount++;
- LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}", eventsReceivedCount,
- record.offset(), record.key());
+ LOGGER.debug("****** Received event No. {} ******\noffset={}\nkey={}\n{}", eventsReceivedCount,
+ record.offset(), record.key(), record.value());
}
} catch (final Exception e) {
// Thread interrupted
@@ -123,4 +123,8 @@ public class KafkaEventSubscriber implements Runnable {
consumer.close();
LOGGER.debug("{} : stopped", KafkaEventSubscriber.class.getName());
}
+
+ public boolean isAlive() {
+ return subscriberThread.isAlive();
+ }
}
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
index b35b961e4..c3289887f 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/adapt/kafka/TestKafka2Kafka.java
@@ -21,6 +21,7 @@
package org.onap.policy.apex.testsuites.integration.uservice.adapt.kafka;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -28,13 +29,11 @@ import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
-import org.onap.policy.apex.core.infrastructure.messaging.MessagingException;
-import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
-import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
import org.onap.policy.apex.service.engine.main.ApexMain;
import org.onap.policy.common.utils.resources.TextFileUtils;
@@ -57,21 +56,20 @@ public class TestKafka2Kafka {
@ClassRule
public static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource()
- // Start a cluster with 1 brokers.
- .withBrokers(1)
- // Disable topic auto-creation.
- .withBrokerProperty("auto.create.topics.enable", "false");
+ // Start a cluster with 1 brokers.
+ .withBrokers(1)
+ // Disable topic auto-creation.
+ .withBrokerProperty("auto.create.topics.enable", "false");
/**
* Test json kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testJsonKafkaEvents() throws MessagingException, ApexException {
+ public void testJsonKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaJsonEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, false, "json");
}
@@ -79,13 +77,12 @@ public class TestKafka2Kafka {
/**
* Test XML kafka events.
*
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception the apex exception
*/
@Test
- public void testXmlKafkaEvents() throws MessagingException, ApexException {
+ public void testXmlKafkaEvents() throws Exception {
final String conditionedConfigFile = getConditionedConfigFile(
- "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
+ "target" + File.separator + "examples/config/SampleDomain/Kafka2KafkaXmlEvent.json");
final String[] args = {"-rfr", "target", "-c", conditionedConfigFile};
testKafkaEvents(args, true, "xml");
@@ -97,44 +94,47 @@ public class TestKafka2Kafka {
* @param args the args
* @param xmlEvents the xml events
* @param topicSuffix the topic suffix
- * @throws MessagingException the messaging exception
- * @throws ApexException the apex exception
+ * @throws Exception on errors
*/
- private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix)
- throws MessagingException, ApexException {
+ private void testKafkaEvents(String[] args, final Boolean xmlEvents, final String topicSuffix) throws Exception {
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-out-" + topicSuffix, 1, (short) 1);
sharedKafkaTestResource.getKafkaTestUtils().createTopic("apex-in-" + topicSuffix, 1, (short) 1);
final KafkaEventSubscriber subscriber =
- new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+ new KafkaEventSubscriber("apex-out-" + topicSuffix, sharedKafkaTestResource);
+
+ await().atMost(30, TimeUnit.SECONDS).until(() -> subscriber.isAlive());
final ApexMain apexMain = new ApexMain(args);
- ThreadUtilities.sleep(3000);
+ await().atMost(10, TimeUnit.SECONDS).until(() -> apexMain.isAlive());
- final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
- EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
+ long initWaitEndTIme = System.currentTimeMillis() + 10000;
- producer.sendEvents();
+ await().atMost(12, TimeUnit.SECONDS).until(() -> initWaitEndTIme < System.currentTimeMillis());
- final long testStartTime = System.currentTimeMillis();
+ final KafkaEventProducer producer = new KafkaEventProducer("apex-in-" + topicSuffix, sharedKafkaTestResource,
+ EVENT_COUNT, xmlEvents, EVENT_INTERVAL);
- // Wait for the producer to send all tis events
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && producer.getEventsSentCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ await().atMost(30, TimeUnit.SECONDS).until(() -> producer.isAlive());
- while (System.currentTimeMillis() < testStartTime + MAX_TEST_LENGTH
- && subscriber.getEventsReceivedCount() < EVENT_COUNT) {
- ThreadUtilities.sleep(EVENT_INTERVAL);
- }
+ producer.sendEvents();
+
+ // Wait for the producer to send all its events
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> producer.getEventsSentCount() >= EVENT_COUNT);
- ThreadUtilities.sleep(3000);
+ await().atMost(MAX_TEST_LENGTH, TimeUnit.MILLISECONDS)
+ .until(() -> subscriber.getEventsReceivedCount() >= EVENT_COUNT);
apexMain.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !apexMain.isAlive());
+
subscriber.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !subscriber.isAlive());
+
producer.shutdown();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> !producer.isAlive());
assertEquals(producer.getEventsSentCount(), subscriber.getEventsReceivedCount());
}
@@ -144,7 +144,7 @@ public class TestKafka2Kafka {
File tempConfigFile = File.createTempFile("Kafka_", ".json");
tempConfigFile.deleteOnExit();
String configAsString = TextFileUtils.getTextFileAsString(configurationFileName)
- .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
+ .replaceAll("localhost:39902", sharedKafkaTestResource.getKafkaConnectString());
TextFileUtils.putStringAsFile(configAsString, tempConfigFile.getCanonicalFile());
return tempConfigFile.getCanonicalPath();
diff --git a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
index da93f919d..3d1a8d7bc 100644
--- a/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
+++ b/testsuites/integration/integration-uservice-test/src/test/java/org/onap/policy/apex/testsuites/integration/uservice/taskparameters/TestTaskParameters.java
@@ -24,9 +24,11 @@ import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
+
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.Response;
+
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,10 +44,9 @@ import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
/**
- * This class runs integration tests for taskParameters.
- * Task parameters are read from the ApexConfig, and they can be accessed in task logic.
- * In this case, the taskParameters are used to set values in executionProperties.
- * URL dynamically populated using executionProperties is hit and values get updated in
+ * This class runs integration tests for taskParameters. Task parameters are read from the ApexConfig, and they can be
+ * accessed in task logic. In this case, the taskParameters are used to set values in executionProperties. URL
+ * dynamically populated using executionProperties is hit and values get updated in
* {@link RestClientEndpointForTaskParameters} which acts as a temporary server for requests.
*/
public class TestTaskParameters {
@@ -121,8 +122,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with no taskIds.
- * When taskIds are not provided, all taskParameters provided in config will be updated to all tasks.
+ * Test taskParameters with no taskIds. When taskIds are not provided, all taskParameters provided in config will be
+ * updated to all tasks.
*/
@Test
public void testTaskParameters_with_noTaskIds() throws Exception {
@@ -132,8 +133,8 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with valid taskIds.
- * When valid taskIds are provided, the the taskParameter will be updated in that particular task alone.
+ * Test taskParameters with valid taskIds. When valid taskIds are provided, the the taskParameter will be updated in
+ * that particular task alone.
*/
@Test
public void testTaskParameters_with_validTaskIds() throws Exception {
@@ -143,9 +144,9 @@ public class TestTaskParameters {
}
/**
- * Test taskParameters with invalid taskIds.
- * When invalid taskIds are provided, or when a taskParameter assigned to a particular taskId is tried to be
- * accessed in a taskLogic of a different task, such taskParameters won't be accessible in the task
+ * Test taskParameters with invalid taskIds. When invalid taskIds are provided, or when a taskParameter assigned to
+ * a particular taskId is tried to be accessed in a taskLogic of a different task, such taskParameters won't be
+ * accessible in the task
*/
@Test
public void testTaskParameters_with_invalidTaskIds() throws Exception {
@@ -165,7 +166,7 @@ public class TestTaskParameters {
String getDetailsUrl = "http://" + HOST + ":" + PORT + "/TestTaskParametersRest/apex/event/getDetails";
// wait for success response code to be received, until a timeout
- await().atMost(2000, TimeUnit.MILLISECONDS)
+ await().atMost(5, TimeUnit.SECONDS)
.until(() -> 200 == client.target(getDetailsUrl).request("application/json").get().getStatus());
apexMain.shutdown();
Response response = client.target(getDetailsUrl).request("application/json").get();
diff --git a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
index fbe9ffc29..341a9fdaa 100644
--- a/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
+++ b/testsuites/integration/integration-uservice-test/src/test/resources/logback-test.xml
@@ -35,4 +35,18 @@
<appender-ref ref="STDOUT" />
</root>
+
+ <logger name="org.onap.policy.apex" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.core" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+ <logger name="org.onap.policy.apex.plugins.executor" level="INFO" additivity="false">
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+
</configuration>