summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java65
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java12
-rw-r--r--plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java10
3 files changed, 41 insertions, 46 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 947dd5466..591f83237 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -49,70 +49,53 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
- // The Kafka consumer used to receive events using Kafka
- private KafkaConsumer<String, String> kafkaConsumer;
-
/**
* {@inheritDoc}.
*/
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
- final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
this.eventReceiver = incomingEventReceiver;
this.name = consumerName;
// Check and get the Kafka Properties
if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
- LOGGER.warn("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
throw new ApexEventException("specified consumer properties of type \""
- + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
- + "\" are not applicable to a Kafka consumer");
- }
- kafkaConsumerProperties = (KafkaCarrierTechnologyParameters) consumerParameters
- .getCarrierTechnologyParameters();
-
- // Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getName()
+ + "\" are not applicable to a Kafka consumer");
}
+ kafkaConsumerProperties =
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
}
-
/**
* {@inheritDoc}.
*/
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
- + kafkaConsumerProperties.getConsumerTopicList());
- }
+ try (KafkaConsumer<String, String> kafkaConsumer =
+ new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for {}:{} subscribed to topics: {}", this.getClass().getName(), this.name,
+ kafkaConsumerProperties.getConsumerTopicList());
+ }
- // The endless loop that receives events over Kafka
- while (consumerThread.isAlive() && !stopOrderedFlag) {
- try {
- final ConsumerRecords<String, String> records = kafkaConsumer
- .poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> record : records) {
- traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(new Properties(), record.value());
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
+ for (final ConsumerRecord<String, String> record : records) {
+ traceIfTraceEnabled(record);
+ eventReceiver.receiveEvent(new Properties(), record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
- } catch (final Exception e) {
- LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
}
}
-
- if (!consumerThread.isInterrupted()) {
- kafkaConsumer.close();
- }
}
/**
@@ -123,7 +106,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
}
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
index 489489ff0..f70bf580b 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/main/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutor.java
@@ -56,6 +56,7 @@ public class JavascriptExecutor implements Runnable {
// Recurring string constants
private static final String WITH_MESSAGE = " with message: ";
private static final String JAVASCRIPT_EXECUTOR = "JavascriptExecutor ";
+ private static final String EXECUTION_FAILED_EXECUTOR = "execution failed, executor ";
@Setter(AccessLevel.PROTECTED)
private static TimeUnit timeunit4Latches = TimeUnit.SECONDS;
@@ -132,6 +133,11 @@ public class JavascriptExecutor implements Runnable {
Thread.currentThread().interrupt();
}
+ if (executorException.get() != null) {
+ executorThread.interrupt();
+ checkAndThrowExecutorException();
+ }
+
checkAndThrowExecutorException();
LOGGER.debug("JavascriptExecutor {} started ... ", subjectKey.getId());
@@ -146,11 +152,11 @@ public class JavascriptExecutor implements Runnable {
*/
public synchronized boolean execute(final Object executionContext) throws StateMachineException {
if (executorThread == null) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId() + " is not initialized");
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId() + " is not initialized");
}
- if (!executorThread.isAlive()) {
- throw new StateMachineException("execution failed, executor " + subjectKey.getId()
+ if (!executorThread.isAlive() || executorThread.isInterrupted()) {
+ throw new StateMachineException(EXECUTION_FAILED_EXECUTOR + subjectKey.getId()
+ " is not running, run cleanUp to clear executor and init to restart executor");
}
diff --git a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
index 6ea15fc35..56ebf9972 100644
--- a/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
+++ b/plugins/plugins-executor/plugins-executor-javascript/src/test/java/org/onap/policy/apex/plugins/executor/javascript/JavascriptExecutorTest.java
@@ -34,8 +34,12 @@ import org.junit.Before;
import org.junit.Test;
import org.onap.policy.apex.core.engine.executor.exception.StateMachineException;
import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
+import org.slf4j.ext.XLogger;
+import org.slf4j.ext.XLoggerFactory;
public class JavascriptExecutorTest {
+ private static final XLogger LOGGER = XLoggerFactory.getXLogger(JavascriptExecutorTest.class);
+
private AtomicBoolean concurrentResult = new AtomicBoolean();
@Before
@@ -277,12 +281,14 @@ public class JavascriptExecutorTest {
public void run() {
try {
while (executor.execute("hello")) {
+ LOGGER.debug("test thread running . . .");
// Loop until interrupted
}
- concurrentResult.set(false);
} catch (StateMachineException e) {
- // Do nothing
+ LOGGER.debug("test thread caught exception", e);
}
+ concurrentResult.set(false);
+ LOGGER.debug("test thread exited");
}
}).start();