diff options
author | liamfallon <liam.fallon@est.tech> | 2019-06-26 15:40:41 +0000 |
---|---|---|
committer | liamfallon <liam.fallon@est.tech> | 2019-06-26 15:40:41 +0000 |
commit | ce9d82d2c0e863597d84cc8909955e398405f45a (patch) | |
tree | 8ebb853119bdf673cf6f9516d428d4cc00080aeb /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main | |
parent | 5f029543f1e673655af2d2974113069df0b6def0 (diff) |
Add passthrough properties for APEX engine
APEX event receiver and sender plugins sometimes need to exchange
information with tasks, especially in the case of REST communication.
This change enables passthrough of Properties from the event carrier
technology plugins to APEX task, task selection, and state finalizer
logics.
Apologies for the size of the review but this change involves passinng
the properties through all the APEX components, hence the large number
of small changes.
Issue-ID: POLICY-1742
Change-Id: I219fd69550f06702ef64adbb165fe7baac422e96
Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main')
3 files changed, 5 insertions, 5 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 6860bca22..3ddc97a86 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -167,7 +167,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord<String, String> record : records) { traceIfTraceEnabled(record); - eventReceiver.receiveEvent(record.value()); + eventReceiver.receiveEvent(null, record.value()); } } catch (final Exception e) { LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index c83c0ae1e..706568ae7 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -22,6 +22,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; import java.util.EnumMap; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -66,8 +67,7 @@ public class ApexKafkaProducer implements ApexEventProducer { if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"; LOGGER.warn(message); - throw new ApexEventException( - message); + throw new ApexEventException(message); } kafkaProducerProperties = (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); @@ -112,7 +112,8 @@ public class ApexKafkaProducer implements ApexEventProducer { * java.lang.Object) */ @Override - public void sendEvent(final long executionId, final String eventName, final Object event) { + public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, + final Object event) { // Check if this is a synchronized event, if so we have received a reply final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index f66dbfe9e..620b1fb02 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -55,7 +55,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getName(); // Repeated strings in messages - private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; private static final String ENTRY = "entry "; private static final String KAFKA_PROPERTIES = "kafkaProperties"; |