summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java2
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java7
2 files changed, 5 insertions, 4 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 6860bca22..3ddc97a86 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -167,7 +167,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis());
for (final ConsumerRecord<String, String> record : records) {
traceIfTraceEnabled(record);
- eventReceiver.receiveEvent(record.value());
+ eventReceiver.receiveEvent(null, record.value());
}
} catch (final Exception e) {
LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
index c83c0ae1e..706568ae7 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
@@ -22,6 +22,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka;
import java.util.EnumMap;
import java.util.Map;
+import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -66,8 +67,7 @@ public class ApexKafkaProducer implements ApexEventProducer {
if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")";
LOGGER.warn(message);
- throw new ApexEventException(
- message);
+ throw new ApexEventException(message);
}
kafkaProducerProperties =
(KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
@@ -112,7 +112,8 @@ public class ApexKafkaProducer implements ApexEventProducer {
* java.lang.Object)
*/
@Override
- public void sendEvent(final long executionId, final String eventName, final Object event) {
+ public void sendEvent(final long executionId, final Properties executionProperties, final String eventName,
+ final Object event) {
// Check if this is a synchronized event, if so we have received a reply
final SynchronousEventCache synchronousEventCache =
(SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);