From f134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c Mon Sep 17 00:00:00 2001 From: liamfallon Date: Thu, 2 Apr 2020 20:54:52 +0100 Subject: Fix failing Kafka tests All the Kafka components need something near 10 seconds to come up completely. This review tweaks the timing to allow the test Kafka server to come up and to allow the consumers to connect to it. Issue-ID: POLICY-2106 Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789 Signed-off-by: liamfallon --- .../event/carrier/kafka/ApexKafkaConsumer.java | 65 ++++++++-------------- .../executor/javascript/JavascriptExecutor.java | 12 +++- .../javascript/JavascriptExecutorTest.java | 10 +++- 3 files changed, 41 insertions(+), 46 deletions(-) (limited to 'plugins') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 947dd5466..591f83237 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The Kafka consumer used to receive events using Kafka - private KafkaConsumer kafkaConsumer; - /** * {@inheritDoc}. */ @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the Kafka Properties if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { - LOGGER.warn("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); throw new ApexEventException("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); - } - kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); - - // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); + + consumerParameters.getCarrierTechnologyParameters().getClass().getName() + + "\" are not applicable to a Kafka consumer"); } + kafkaConsumerProperties = + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); } - /** * {@inheritDoc}. */ @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); - } + try (KafkaConsumer kafkaConsumer = + new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) { + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name, + kafkaConsumerProperties.getConsumerTopicList()); + } - // The endless loop that receives events over Kafka - while (consumerThread.isAlive() && !stopOrderedFlag) { - try { - final ConsumerRecords records = kafkaConsumer - .poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); + for (final ConsumerRecord record : records) { + traceIfTraceEnabled(record); + eventReceiver.receiveEvent(new Properties(), record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } - } catch (final Exception e) { - LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } } - - if (!consumerThread.isInterrupted()) { - kafkaConsumer.close(); - } } /** @@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { private void traceIfTraceEnabled(final ConsumerRecord record) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, record.key(), record.value()); } } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java index 489489ff0..f70bf580b 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java @@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable { // Recurring string constants private static final String WITH_MESSAGE = " with message: "; private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor "; + private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor "; @Setter(AccessLevel.PROTECTED) private static TimeUnit timeunit4Latches = TimeUnit.SECONDS; @@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable { Thread.currentThread().interrupt(); } + if (executorException.get() != null) { + executorThread.interrupt(); + checkAndThrowExecutorException(); + } + checkAndThrowExecutorException(); LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId()); @@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable { */ public synchronized boolean execute(final Object executionContext) throws StateMachineException { if (executorThread == null) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized"); + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized"); } - if (!executorThread.isAlive()) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + if (!executorThread.isAlive() || executorThread.isInterrupted()) { + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not running, run cleanUp to clear executor and init to restart executor"); } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java index 6ea15fc35..56ebf9972 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java @@ -34,8 +34,12 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; public class JavascriptExecutorTest { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class); + private AtomicBoolean concurrentResult = new AtomicBoolean(); @Before @@ -277,12 +281,14 @@ public class JavascriptExecutorTest { public void run() { try { while (executor.execute("hello")) { + LOGGER.debug("test thread running . . ."); // Loop until interrupted } - concurrentResult.set(false); } catch (StateMachineException e) { - // Do nothing + LOGGER.debug("test thread caught exception", e); } + concurrentResult.set(false); + LOGGER.debug("test thread exited"); } }).start(); -- cgit 1.2.3-korg