diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java | 42 |
1 files changed, 4 insertions, 38 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index 98ff61121..bb5398ceb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; @@ -41,7 +42,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexKafkaProducer implements ApexEventProducer { +public class ApexKafkaProducer extends ApexPluginsEventProducer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); @@ -52,12 +53,6 @@ public class ApexKafkaProducer implements ApexEventProducer { // The Kafka Producer used to send events using Kafka private Producer<String, Object> kafkaProducer; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - @Override public void init(final String producerName, final EventHandlerParameters producerParameters) throws ApexEventException { @@ -77,38 +72,9 @@ public class ApexKafkaProducer implements ApexEventProducer { * {@inheritDoc}. */ @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - - /** - * {@inheritDoc}. - */ - @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); // Kafka producer must be started in the same thread as it is stopped, so we must start it here if (kafkaProducer == null) { |