From e1735a6044b9a0b5ec8e3afede88744a7d57782a Mon Sep 17 00:00:00 2001 From: "ning.xi" Date: Tue, 25 Feb 2020 16:55:22 +0800 Subject: remove duplication code in plugins-event producer Issue-ID: POLICY-1884 Change-Id: I7774dfdc7d7edcc0409a817bfd21ddacec21d085 Signed-off-by: ning.xi --- .../event/carrier/kafka/ApexKafkaProducer.java | 42 +++------------------- 1 file changed, 4 insertions(+), 38 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index 98ff61121..bb5398ceb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * Modifications Copyright (C) 2020 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +29,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.onap.policy.apex.service.engine.event.ApexEventException; -import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; @@ -41,7 +42,7 @@ import org.slf4j.LoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ApexKafkaProducer implements ApexEventProducer { +public class ApexKafkaProducer extends ApexPluginsEventProducer { // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); @@ -52,12 +53,6 @@ public class ApexKafkaProducer implements ApexEventProducer { // The Kafka Producer used to send events using Kafka private Producer kafkaProducer; - // The name for this producer - private String name = null; - - // The peer references for this event handler - private Map peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); - @Override public void init(final String producerName, final EventHandlerParameters producerParameters) throws ApexEventException { @@ -73,42 +68,13 @@ public class ApexKafkaProducer implements ApexEventProducer { (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); } - /** - * {@inheritDoc}. - */ - @Override - public String getName() { - return name; - } - - /** - * {@inheritDoc}. - */ - @Override - public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { - return peerReferenceMap.get(peeredMode); - } - - /** - * {@inheritDoc}. - */ - @Override - public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { - peerReferenceMap.put(peeredMode, peeredReference); - } - /** * {@inheritDoc}. */ @Override public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, final Object event) { - // Check if this is a synchronized event, if so we have received a reply - final SynchronousEventCache synchronousEventCache = - (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); - if (synchronousEventCache != null) { - synchronousEventCache.removeCachedEventToApexIfExists(executionId); - } + super.sendEvent(executionId, executionProperties, eventName, event); // Kafka producer must be started in the same thread as it is stopped, so we must start it here if (kafkaProducer == null) { -- cgit 1.2.3-korg