summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2020-04-02 20:54:52 +0100
committerliamfallon <liam.fallon@est.tech>2020-04-03 19:20:26 +0100
commitf134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c (patch)
treeac4bb39fac4f45637a23a6a72b7e148689f436f1 /plugins
parent640aaf64a0b28b53a7425c17b9065a46c29d3587 (diff)
Fix failing Kafka tests
All the Kafka components need something near 10 seconds to come up completely. This review tweaks the timing to allow the test Kafka server to come up and to allow the consumers to connect to it. Issue-ID: POLICY-2106 Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java65
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java12
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java10
3 files changed, 41 insertions, 46 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 947dd5466..591f83237 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
- // The Kafka consumer used to receive events using Kafka
- private KafkaConsumer<String, String> kafkaConsumer;
-
/**
* {@inheritDoc}.
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
- LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
- }
- kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
-
- // Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
+ kafkaConsumerProperties =
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
}
-
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
- }
+ try (KafkaConsumer<String, String> kafkaConsumer =
+ new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
+ kafkaConsumerProperties.getConsumerTopicList());
+ }
- // The endless loop that receives events over Kafka
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- try {
- final ConsumerRecords<String, String> records = kafkaConsumer
- .poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> record : records) {
- traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(new Properties(), record.value());
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ for (final ConsumerRecord<String, String> record : records) {
+ traceIfTraceEnabled(record);
+ eventReceiver.receiveEvent(new Properties(), record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
- } catch (final Exception e) {
- LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
}
-
- if (!consumerThread.isInterrupted()) {
- kafkaConsumer.close();
- }
}
/**
@@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
index 489489ff0..f70bf580b 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
@@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable {
// Recurring string constants
private static final String WITH_MESSAGE = " with message: ";
private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor ";
+ private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor ";
@Setter(AccessLevel.PROTECTED)
private static TimeUnit timeunit4Latches = TimeUnit.SECONDS;
@@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable {
Thread.currentThread().interrupt();
}
+ if (executorException.get() != null) {
+ executorThread.interrupt();
+ checkAndThrowExecutorException();
+ }
+
checkAndThrowExecutorException();
LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId());
@@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable {
*/
public synchronized boolean execute(final Object executionContext) throws StateMachineException {
if (executorThread == null) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized");
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized");
}
- if (!executorThread.isAlive()) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId()
+ if (!executorThread.isAlive() || executorThread.isInterrupted()) {
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId()
+ " is not running, run cleanUp to clear executor and init to restart executor");
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
index 6ea15fc35..56ebf9972 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
@@ -34,8 +34,12 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
public class JavascriptExecutorTest {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class);
+
private AtomicBoolean concurrentResult = new AtomicBoolean();
@Before
@@ -277,12 +281,14 @@ public class JavascriptExecutorTest {
public void run() {
try {
while (executor.execute("hello")) {
+ LOGGER.debug("test thread running . . .");
// Loop until interrupted
}
- concurrentResult.set(false);
} catch (StateMachineException e) {
- // Do nothing
+ LOGGER.debug("test thread caught exception", e);
}
+ concurrentResult.set(false);
+ LOGGER.debug("test thread exited");
}
}).start();