diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
2 files changed, 5 insertions, 4 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 6860bca22..3ddc97a86 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -167,7 +167,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord<String, String> record : records) { traceIfTraceEnabled(record); - eventReceiver.receiveEvent(record.value()); + eventReceiver.receiveEvent(null, record.value()); } } catch (final Exception e) { LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index c83c0ae1e..706568ae7 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -22,6 +22,7 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; import java.util.EnumMap; import java.util.Map; +import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -66,8 +67,7 @@ public class ApexKafkaProducer implements ApexEventProducer { if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"; LOGGER.warn(message); - throw new ApexEventException( - message); + throw new ApexEventException(message); } kafkaProducerProperties = (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); @@ -112,7 +112,8 @@ public class ApexKafkaProducer implements ApexEventProducer { * java.lang.Object) */ @Override - public void sendEvent(final long executionId, final String eventName, final Object event) { + public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, + final Object event) { // Check if this is a synchronized event, if so we have received a reply final SynchronousEventCache synchronousEventCache = (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); |