summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java198
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java151
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java396
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java27
4 files changed, 772 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
new file mode 100644
index 000000000..3351a58e9
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -0,0 +1,198 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.kafka;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
+import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements an Apex event consumer that receives events using Kafka.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class);
+
+ // The Kafka parameters read from the parameter service
+ private KAFKACarrierTechnologyParameters kafkaConsumerProperties;
+
+ // The event receiver that will receive events from this consumer
+ private ApexEventReceiver eventReceiver;
+
+ // The Kafka consumer used to receive events using Kafka
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ // The name for this consumer
+ private String name = null;
+
+ // The peer references for this event handler
+ private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
+
+ // The consumer thread and stopping flag
+ private Thread consumerThread;
+ private boolean stopOrderedFlag = false;
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#init(java.lang.String,
+ * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters,
+ * org.onap.policy.apex.service.engine.event.ApexEventReceiver)
+ */
+ @Override
+ public void init(final String consumerName, final EventHandlerParameters consumerParameters,
+ final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
+ this.eventReceiver = incomingEventReceiver;
+ this.name = consumerName;
+
+ // Check and get the Kafka Properties
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) {
+ LOGGER.warn("specified consumer properties of type \""
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
+ + "\" are not applicable to a Kafka consumer");
+ throw new ApexEventException("specified consumer properties of type \""
+ + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
+ + "\" are not applicable to a Kafka consumer");
+ }
+ kafkaConsumerProperties =
+ (KAFKACarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+
+ // Kick off the Kafka consumer
+ kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ + kafkaConsumerProperties.getConsumerTopicList());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start()
+ */
+ @Override
+ public void start() {
+ // Configure and start the event reception thread
+ final String threadName = this.getClass().getName() + ":" + this.name;
+ consumerThread = new ApplicationThreadFactory(threadName).newThread(this);
+ consumerThread.setDaemon(true);
+ consumerThread.start();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service.
+ * parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service.
+ * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ // Kick off the Kafka consumer
+ kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ + kafkaConsumerProperties.getConsumerTopicList());
+ }
+
+ // The endless loop that receives events over Kafka
+ while (consumerThread.isAlive() && !stopOrderedFlag) {
+ try {
+ final ConsumerRecords<String, String> records =
+ kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime());
+ for (final ConsumerRecord<String, String> record : records) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ }
+ eventReceiver.receiveEvent(record.value());
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e);
+ }
+ }
+
+ if (!consumerThread.isInterrupted()) {
+ kafkaConsumer.close();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#stop()
+ */
+ @Override
+ public void stop() {
+ stopOrderedFlag = true;
+
+ while (consumerThread.isAlive()) {
+ ThreadUtilities.sleep(kafkaConsumerProperties.getConsumerPollTime());
+ }
+ }
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
new file mode 100644
index 000000000..fb851bc70
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
@@ -0,0 +1,151 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.kafka;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
+import org.onap.policy.apex.service.engine.event.ApexEventProducer;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
+import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Concrete implementation of an Apex event producer that sends events using Kafka.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class ApexKafkaProducer implements ApexEventProducer {
+
+ // Get a reference to the logger
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class);
+
+ // The Kafka parameters read from the parameter service
+ private KAFKACarrierTechnologyParameters kafkaProducerProperties;
+
+ // The Kafka Producer used to send events using Kafka
+ private Producer<String, Object> kafkaProducer;
+
+ // The name for this producer
+ private String name = null;
+
+ // The peer references for this event handler
+ private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class);
+
+ @Override
+ public void init(final String producerName, final EventHandlerParameters producerParameters)
+ throws ApexEventException {
+ this.name = producerName;
+
+ // Check and get the Kafka Properties
+ if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) {
+ LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")");
+ throw new ApexEventException(
+ "specified producer properties are not applicable to a Kafka producer (" + this.name + ")");
+ }
+ kafkaProducerProperties =
+ (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName()
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service.
+ * parameters.eventhandler.EventHandlerPeeredMode)
+ */
+ @Override
+ public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) {
+ return peerReferenceMap.get(peeredMode);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service.
+ * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference)
+ */
+ @Override
+ public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) {
+ peerReferenceMap.put(peeredMode, peeredReference);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#sendEvent(long, java.lang.String,
+ * java.lang.Object)
+ */
+ @Override
+ public void sendEvent(final long executionId, final String eventName, final Object event) {
+ // Check if this is a synchronized event, if so we have received a reply
+ final SynchronousEventCache synchronousEventCache =
+ (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS);
+ if (synchronousEventCache != null) {
+ synchronousEventCache.removeCachedEventToApexIfExists(executionId);
+ }
+
+ // Kafka producer must be started in the same thread as it is stopped, so we must start it here
+ if (kafkaProducer == null) {
+ // Kick off the Kafka producer
+ kafkaProducer = new KafkaProducer<String, Object>(kafkaProducerProperties.getKafkaProducerProperties());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("event producer " + this.name + " is ready to send to topics: "
+ + kafkaProducerProperties.getProducerTopic());
+ }
+ }
+
+ kafkaProducer.send(new ProducerRecord<String, Object>(kafkaProducerProperties.getProducerTopic(), name, event));
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("event sent from engine using {} to topic {} : {} ", this.name,
+ kafkaProducerProperties.getProducerTopic(), event);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop()
+ */
+ @Override
+ public void stop() {
+ if (kafkaProducer != null) {
+ kafkaProducer.flush();
+ kafkaProducer.close();
+ }
+ }
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java
new file mode 100644
index 000000000..2357b1807
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java
@@ -0,0 +1,396 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
+
+/**
+ * Apex parameters for Kafka as an event carrier technology.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
+ // @formatter:off
+ /** The label of this carrier technology. */
+ public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
+
+ /** The producer plugin class for the Kafka carrier technology. */
+ public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName();
+
+ /** The consumer plugin class for the Kafka carrier technology. */
+ public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName();
+
+ // Default parameter values
+ private static final String DEFAULT_ACKS = "all";
+ private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final int DEFAULT_RETRIES = 0;
+ private static final int DEFAULT_BATCH_SIZE = 16384;
+ private static final int DEFAULT_LINGER_TIME = 1;
+ private static final long DEFAULT_BUFFER_MEMORY = 33554432;
+ private static final String DEFAULT_GROUP_ID = "default-group-id";
+ private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true;
+ private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
+ private static final int DEFAULT_SESSION_TIMEOUT = 30000;
+ private static final String DEFAULT_PRODUCER_TOPIC = "apex-out";
+ private static final int DEFAULT_CONSUMER_POLL_TIME = 100;
+ private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
+ private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+
+ // Parameter property map tokens
+ private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
+ private static final String PROPERTY_ACKS = "acks";
+ private static final String PROPERTY_RETRIES = "retries";
+ private static final String PROPERTY_BATCH_SIZE = "batch.size";
+ private static final String PROPERTY_LINGER_TIME = "linger.ms";
+ private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory";
+ private static final String PROPERTY_GROUP_ID = "group.id";
+ private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit";
+ private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms";
+ private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms";
+ private static final String PROPERTY_KEY_SERIALIZER = "key.serializer";
+ private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
+ private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
+ private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
+
+ // kafka carrier parameters
+ private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
+ private String acks = DEFAULT_ACKS;
+ private int retries = DEFAULT_RETRIES;
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private int lingerTime = DEFAULT_LINGER_TIME;
+ private long bufferMemory = DEFAULT_BUFFER_MEMORY;
+ private String groupId = DEFAULT_GROUP_ID;
+ private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT;
+ private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
+ private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+ private String producerTopic = DEFAULT_PRODUCER_TOPIC;
+ private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME;
+ private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
+ private String keySerializer = DEFAULT_KEY_SERIALIZER;
+ private String valueSerializer = DEFAULT_VALUE_SERIALIZER;
+ private String keyDeserializer = DEFAULT_KEY_DESERIALIZER;
+ private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
+ // @formatter:on
+
+ /**
+ * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
+ * service.
+ */
+ public KAFKACarrierTechnologyParameters() {
+ super(KAFKACarrierTechnologyParameters.class.getCanonicalName());
+
+ // Set the carrier technology properties for the kafka carrier technology
+ this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL);
+ this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS);
+ this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS);
+ }
+
+ /**
+ * Gets the kafka producer properties.
+ *
+ * @return the kafka producer properties
+ */
+ public Properties getKafkaProducerProperties() {
+ final Properties kafkaProperties = new Properties();
+
+ kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ kafkaProperties.put(PROPERTY_ACKS, acks);
+ kafkaProperties.put(PROPERTY_RETRIES, retries);
+ kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
+ kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
+ kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
+ kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
+ kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+
+ return kafkaProperties;
+ }
+
+ /**
+ * Gets the kafka consumer properties.
+ *
+ * @return the kafka consumer properties
+ */
+ public Properties getKafkaConsumerProperties() {
+ final Properties kafkaProperties = new Properties();
+
+ kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
+ kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
+ kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
+ kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
+ kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
+ kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
+
+ return kafkaProperties;
+ }
+
+ /**
+ * Gets the bootstrap servers.
+ *
+ * @return the bootstrap servers
+ */
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ /**
+ * Gets the acks.
+ *
+ * @return the acks
+ */
+ public String getAcks() {
+ return acks;
+ }
+
+ /**
+ * Gets the retries.
+ *
+ * @return the retries
+ */
+ public int getRetries() {
+ return retries;
+ }
+
+ /**
+ * Gets the batch size.
+ *
+ * @return the batch size
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Gets the linger time.
+ *
+ * @return the linger time
+ */
+ public int getLingerTime() {
+ return lingerTime;
+ }
+
+ /**
+ * Gets the buffer memory.
+ *
+ * @return the buffer memory
+ */
+ public long getBufferMemory() {
+ return bufferMemory;
+ }
+
+ /**
+ * Gets the group id.
+ *
+ * @return the group id
+ */
+ public String getGroupId() {
+ return groupId;
+ }
+
+ /**
+ * Checks if is enable auto commit.
+ *
+ * @return true, if checks if is enable auto commit
+ */
+ public boolean isEnableAutoCommit() {
+ return enableAutoCommit;
+ }
+
+ /**
+ * Gets the auto commit time.
+ *
+ * @return the auto commit time
+ */
+ public int getAutoCommitTime() {
+ return autoCommitTime;
+ }
+
+ /**
+ * Gets the session timeout.
+ *
+ * @return the session timeout
+ */
+ public int getSessionTimeout() {
+ return sessionTimeout;
+ }
+
+ /**
+ * Gets the producer topic.
+ *
+ * @return the producer topic
+ */
+ public String getProducerTopic() {
+ return producerTopic;
+ }
+
+ /**
+ * Gets the consumer poll time.
+ *
+ * @return the consumer poll time
+ */
+ public long getConsumerPollTime() {
+ return consumerPollTime;
+ }
+
+ /**
+ * Gets the consumer topic list.
+ *
+ * @return the consumer topic list
+ */
+ public Collection<String> getConsumerTopicList() {
+ return Arrays.asList(consumerTopicList);
+ }
+
+ /**
+ * Gets the key serializer.
+ *
+ * @return the key serializer
+ */
+ public String getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * Gets the value serializer.
+ *
+ * @return the value serializer
+ */
+ public String getValueSerializer() {
+ return valueSerializer;
+ }
+
+ /**
+ * Gets the key deserializer.
+ *
+ * @return the key deserializer
+ */
+ public String getKeyDeserializer() {
+ return keyDeserializer;
+ }
+
+ /**
+ * Gets the value deserializer.
+ *
+ * @return the value deserializer
+ */
+ public String getValueDeserializer() {
+ return valueDeserializer;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate()
+ */
+ @Override
+ public String validate() {
+ final StringBuilder errorMessageBuilder = new StringBuilder();
+
+ errorMessageBuilder.append(super.validate());
+
+ if (bootstrapServers == null || bootstrapServers.trim().length() == 0) {
+ errorMessageBuilder
+ .append(" bootstrapServers not specified, must be specified as a string of form host:port\n");
+ }
+
+ if (acks == null || acks.trim().length() == 0) {
+ errorMessageBuilder.append(" acks not specified, must be specified as a string with values [0|1|all]\n");
+ }
+
+ if (retries < 0) {
+ errorMessageBuilder.append(" retries [" + retries + "] invalid, must be specified as retries >= 0\n");
+ }
+
+ if (batchSize < 0) {
+ errorMessageBuilder
+ .append(" batchSize [" + batchSize + "] invalid, must be specified as batchSize >= 0\n");
+ }
+
+ if (lingerTime < 0) {
+ errorMessageBuilder
+ .append(" lingerTime [" + lingerTime + "] invalid, must be specified as lingerTime >= 0\n");
+ }
+
+ if (bufferMemory < 0) {
+ errorMessageBuilder
+ .append(" bufferMemory [" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0\n");
+ }
+
+ if (groupId == null || groupId.trim().length() == 0) {
+ errorMessageBuilder.append(" groupId not specified, must be specified as a string\n");
+ }
+
+ if (autoCommitTime < 0) {
+ errorMessageBuilder.append(
+ " autoCommitTime [" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0\n");
+ }
+
+ if (sessionTimeout < 0) {
+ errorMessageBuilder.append(
+ " sessionTimeout [" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0\n");
+ }
+
+ if (producerTopic == null || producerTopic.trim().length() == 0) {
+ errorMessageBuilder.append(" producerTopic not specified, must be specified as a string\n");
+ }
+
+ if (consumerPollTime < 0) {
+ errorMessageBuilder.append(" consumerPollTime [" + consumerPollTime
+ + "] invalid, must be specified as consumerPollTime >= 0\n");
+ }
+
+ if (consumerTopicList == null || consumerTopicList.length == 0) {
+ errorMessageBuilder.append(" consumerTopicList not specified, must be specified as a list of strings\n");
+ }
+
+ for (final String consumerTopic : consumerTopicList) {
+ if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+ errorMessageBuilder.append(" invalid consumer topic \"" + consumerTopic
+ + "\" specified on consumerTopicList, consumer topics must be specified as strings\n");
+ }
+ }
+
+ if (keySerializer == null || keySerializer.trim().length() == 0) {
+ errorMessageBuilder.append(" keySerializer not specified, must be specified as a string\n");
+ }
+
+ if (valueSerializer == null || valueSerializer.trim().length() == 0) {
+ errorMessageBuilder.append(" valueSerializer not specified, must be specified as a string\n");
+ }
+
+ if (keyDeserializer == null || keyDeserializer.trim().length() == 0) {
+ errorMessageBuilder.append(" keyDeserializer not specified, must be specified as a string\n");
+ }
+
+ if (valueDeserializer == null || valueDeserializer.trim().length() == 0) {
+ errorMessageBuilder.append(" valueDeserializer not specified, must be specified as a string\n");
+ }
+
+ return errorMessageBuilder.toString();
+ }
+}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java
new file mode 100644
index 000000000..f04aaceef
--- /dev/null
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2016-2018 Ericsson. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+/**
+ * Implements the APEX event carrier technology plugin for Kafka.
+ *
+ * @author Liam Fallon (liam.fallon@ericsson.com)
+ */
+
+package org.onap.policy.apex.plugins.event.carrier.kafka;