diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java | 151 |
1 files changed, 151 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java new file mode 100644 index 000000000..fb851bc70 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; + +import java.util.EnumMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events using Kafka. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexKafkaProducer implements ApexEventProducer { + + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); + + // The Kafka parameters read from the parameter service + private KAFKACarrierTechnologyParameters kafkaProducerProperties; + + // The Kafka Producer used to send events using Kafka + private Producer<String, Object> kafkaProducer; + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the Kafka Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { + LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + throw new ApexEventException( + "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + } + kafkaProducerProperties = + (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#sendEvent(long, java.lang.String, + * java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object event) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Kafka producer must be started in the same thread as it is stopped, so we must start it here + if (kafkaProducer == null) { + // Kick off the Kafka producer + kafkaProducer = new KafkaProducer<String, Object>(kafkaProducerProperties.getKafkaProducerProperties()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event producer " + this.name + " is ready to send to topics: " + + kafkaProducerProperties.getProducerTopic()); + } + } + + kafkaProducer.send(new ProducerRecord<String, Object>(kafkaProducerProperties.getProducerTopic(), name, event)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event sent from engine using {} to topic {} : {} ", this.name, + kafkaProducerProperties.getProducerTopic(), event); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + if (kafkaProducer != null) { + kafkaProducer.flush(); + kafkaProducer.close(); + } + } +} |