diff options
Diffstat (limited to 'plugins')
3 files changed, 41 insertions, 46 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 947dd5466..591f83237 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; - // The Kafka consumer used to receive events using Kafka - private KafkaConsumer<String, String> kafkaConsumer; - /** * {@inheritDoc}. */ @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.name = consumerName; // Check and get the Kafka Properties if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { - LOGGER.warn("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); throw new ApexEventException("specified consumer properties of type \"" - + consumerParameters.getCarrierTechnologyParameters().getClass().getName() - + "\" are not applicable to a Kafka consumer"); - } - kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters - .getCarrierTechnologyParameters(); - - // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); + + consumerParameters.getCarrierTechnologyParameters().getClass().getName() + + "\" are not applicable to a Kafka consumer"); } + kafkaConsumerProperties = + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); } - /** * {@inheritDoc}. */ @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " - + kafkaConsumerProperties.getConsumerTopicList()); - } + try (KafkaConsumer<String, String> kafkaConsumer = + new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) { + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name, + kafkaConsumerProperties.getConsumerTopicList()); + } - // The endless loop that receives events over Kafka - while (consumerThread.isAlive() && !stopOrderedFlag) { - try { - final ConsumerRecords<String, String> records = kafkaConsumer - .poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord<String, String> record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords<String, String> records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); + for (final ConsumerRecord<String, String> record : records) { + traceIfTraceEnabled(record); + eventReceiver.receiveEvent(new Properties(), record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } - } catch (final Exception e) { - LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); } } - - if (!consumerThread.isInterrupted()) { - kafkaConsumer.close(); - } } /** @@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, record.key(), record.value()); } } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java index 489489ff0..f70bf580b 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java @@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable { // Recurring string constants private static final String WITH_MESSAGE = " with message: "; private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor "; + private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor "; @Setter(AccessLevel.PROTECTED) private static TimeUnit timeunit4Latches = TimeUnit.SECONDS; @@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable { Thread.currentThread().interrupt(); } + if (executorException.get() != null) { + executorThread.interrupt(); + checkAndThrowExecutorException(); + } + checkAndThrowExecutorException(); LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId()); @@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable { */ public synchronized boolean execute(final Object executionContext) throws StateMachineException { if (executorThread == null) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized"); + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized"); } - if (!executorThread.isAlive()) { - throw new StateMachineException("execution failed, executor " + subjectKey.getId() + if (!executorThread.isAlive() || executorThread.isInterrupted()) { + throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not running, run cleanUp to clear executor and init to restart executor"); } diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java index 6ea15fc35..56ebf9972 100644 --- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java +++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java @@ -34,8 +34,12 @@ import org.junit.Before; import org.junit.Test; import org.onap.policy.apex.core.engine.executor.exception.StateMachineException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; public class JavascriptExecutorTest { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class); + private AtomicBoolean concurrentResult = new AtomicBoolean(); @Before @@ -277,12 +281,14 @@ public class JavascriptExecutorTest { public void run() { try { while (executor.execute("hello")) { + LOGGER.debug("test thread running . . ."); // Loop until interrupted } - concurrentResult.set(false); } catch (StateMachineException e) { - // Do nothing + LOGGER.debug("test thread caught exception", e); } + concurrentResult.set(false); + LOGGER.debug("test thread exited"); } }).start(); |