From ce9d82d2c0e863597d84cc8909955e398405f45a Mon Sep 17 00:00:00 2001 From: liamfallon Date: Wed, 26 Jun 2019 15:40:41 +0000 Subject: Add passthrough properties for APEX engine APEX event receiver and sender plugins sometimes need to exchange information with tasks, especially in the case of REST communication. This change enables passthrough of Properties from the event carrier technology plugins to APEX task, task selection, and state finalizer logics. Apologies for the size of the review but this change involves passinng the properties through all the APEX components, hence the large number of small changes. Issue-ID: POLICY-1742 Change-Id: I219fd69550f06702ef64adbb165fe7baac422e96 Signed-off-by: liamfallon --- .../policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java | 2 +- .../policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java | 7 ++++--- .../event/carrier/kafka/KafkaCarrierTechnologyParameters.java | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 6860bca22..3ddc97a86 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -167,7 +167,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord record : records) { traceIfTraceEnabled(record); - eventReceiver.receiveEvent(record.value()); + eventReceiver.receiveEvent(null, record.value()); } } catch (final Exception e) { LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index c83c0ae1e..706568ae7 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -22,6 +22,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; import java.util.EnumMap; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -66,8 +67,7 @@ public class ApexKafkaProducer implements ApexEventProducer { if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"; LOGGER.warn(message); - throw new ApexEventException( - message); + throw new ApexEventException(message); } kafkaProducerProperties = (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); @@ -112,7 +112,8 @@ public class ApexKafkaProducer implements ApexEventProducer { * java.lang.Object) */ @Override - public void sendEvent(final long executionId, final String eventName, final Object event) { + public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, + final Object event) { // Check if this is a synchronized event, if so we have received a reply final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index f66dbfe9e..620b1fb02 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -55,7 +55,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName(); // Repeated strings in messages - private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; private static final String ENTRY = "entry "; private static final String KAFKA_PROPERTIES = "kafkaProperties"; -- cgit 1.2.3-korg