diff options
author | liamfallon <liam.fallon@est.tech> | 2020-04-02 20:54:52 +0100 |
---|---|---|
committer | liamfallon <liam.fallon@est.tech> | 2020-04-03 19:20:26 +0100 |
commit | f134a5eb8bc9ddb6c1dea1a58d440bfdec6dab5c (patch) | |
tree | ac4bb39fac4f45637a23a6a72b7e148689f436f1 /plugins | |
parent | 640aaf64a0b28b53a7425c17b9065a46c29d3587 (diff) |
Fix failing Kafka tests
All the Kafka components need something near 10 seconds to come up
completely. This review tweaks the timing to allow the test Kafka server
to come up and to allow the consumers to connect to it.
Issue-ID: POLICY-2106
Change-Id: I6dd8ace0848bdc2549e658ef8908b4d85d5ea789
Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins')
3 files changed, 41 insertions, 46 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 947dd5466..591f83237 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The Kafka consumer used to receive events using Kafka - private KafkaConsumer<String, String> kafkaConsumer; - /** * {@inheritDoc}. */ @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the Kafka Properties if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { - LOGGER.warn("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); throw new ApexEventException("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); - } - kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); - - // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); + + consumerParameters.getCarrierTechnologyParameters().getClass().getName() + + "\" are not applicable to a Kafka consumer"); } + kafkaConsumerProperties = + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); } - /** * {@inheritDoc}. */ @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); - } + try (KafkaConsumer<String, String> kafkaConsumer = + new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) { + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name, + kafkaConsumerProperties.getConsumerTopicList()); + } - // The endless loop that receives events over Kafka - while (consumerThread.isAlive() && !stopOrderedFlag) { - try { - final ConsumerRecords<String, String> records = kafkaConsumer - .poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord<String, String> record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords<String, String> records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); + for (final ConsumerRecord<String, String> record : records) { + traceIfTraceEnabled(record); + eventReceiver.receiveEvent(new Properties(), record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } - } catch (final Exception e) { - LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } } - - if (!consumerThread.isInterrupted()) { - kafkaConsumer.close(); - } } /** @@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, record.key(), record.value()); } } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java index 489489ff0..f70bf580b 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java @@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable { // Recurring string constants private static final String WITH_MESSAGE = " with message: "; private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor "; + private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor "; @Setter(AccessLevel.PROTECTED) private static TimeUnit timeunit4Latches = TimeUnit.SECONDS; @@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable { Thread.currentThread().interrupt(); } + if (executorException.get() != null) { + executorThread.interrupt(); + checkAndThrowExecutorException(); + } + checkAndThrowExecutorException(); LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId()); @@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable { */ public synchronized boolean execute(final Object executionContext) throws StateMachineException { if (executorThread == null) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized"); + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized"); } - if (!executorThread.isAlive()) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + if (!executorThread.isAlive() || executorThread.isInterrupted()) { + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not running, run cleanUp to clear executor and init to restart executor"); } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java index 6ea15fc35..56ebf9972 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java @@ -34,8 +34,12 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; public class JavascriptExecutorTest { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class); + private AtomicBoolean concurrentResult = new AtomicBoolean(); @Before @@ -277,12 +281,14 @@ public class JavascriptExecutorTest { public void run() { try { while (executor.execute("hello")) { + LOGGER.debug("test thread running . . ."); // Loop until interrupted } - concurrentResult.set(false); } catch (StateMachineException e) { - // Do nothing + LOGGER.debug("test thread caught exception", e); } + concurrentResult.set(false); + LOGGER.debug("test thread exited"); } }).start(); |