diff options
author | ramverma <ram.krishna.verma@ericsson.com> | 2018-06-08 11:56:21 +0100 |
---|---|---|
committer | ramverma <ram.krishna.verma@ericsson.com> | 2018-06-08 14:07:21 +0100 |
commit | 697d02bf4e4188e3040cd987dee97d15f397a35f (patch) | |
tree | 08c9f3231c155d33827a5651e1e0263d4b8db8be /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka | |
parent | 9289ac0afefe62f6c8e9cebddb611a8571bf5642 (diff) |
Adding plugins-event module to apex-pdp
Adding plugins-event module to apex-pdp
Fix a minor bug in TextFileUtils
Change-Id: I393c5f5809d078850d6669d22759ba9fa1b4f0e6
Issue-ID: POLICY-862
Signed-off-by: ramverma <ram.krishna.verma@ericsson.com>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
5 files changed, 816 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml new file mode 100644 index 000000000..cd015bb64 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml @@ -0,0 +1,44 @@ +<!-- + ============LICENSE_START======================================================= + Copyright (C) 2018 Ericsson. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + SPDX-License-Identifier: Apache-2.0 + ============LICENSE_END========================================================= +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.onap.policy.apex-pdp.plugins.plugins-event.plugins-event-carrier</groupId> + <artifactId>plugins-event-carrier</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>plugins-event-carrier-kafka</artifactId> + <name>${project.artifactId}</name> + <description>[${project.parent.artifactId}] Plugin for handling events being transported over Kafka</description> + + <properties> + <apex-plugins-event-carrier-kafka-dir>${project.basedir}/src</apex-plugins-event-carrier-kafka-dir> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.2.1</version> + </dependency> + </dependencies> +</project>
\ No newline at end of file diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java new file mode 100644 index 000000000..3351a58e9 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -0,0 +1,198 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; + +import java.util.EnumMap; +import java.util.Map; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements an Apex event consumer that receives events using Kafka. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class); + + // The Kafka parameters read from the parameter service + private KAFKACarrierTechnologyParameters kafkaConsumerProperties; + + // The event receiver that will receive events from this consumer + private ApexEventReceiver eventReceiver; + + // The Kafka consumer used to receive events using Kafka + private KafkaConsumer<String, String> kafkaConsumer; + + // The name for this consumer + private String name = null; + + // The peer references for this event handler + private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + // The consumer thread and stopping flag + private Thread consumerThread; + private boolean stopOrderedFlag = false; + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#init(java.lang.String, + * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters, + * org.onap.policy.apex.service.engine.event.ApexEventReceiver) + */ + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + this.name = consumerName; + + // Check and get the Kafka Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { + LOGGER.warn("specified consumer properties of type \"" + + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + + "\" are not applicable to a Kafka consumer"); + throw new ApexEventException("specified consumer properties of type \"" + + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + + "\" are not applicable to a Kafka consumer"); + } + kafkaConsumerProperties = + (KAFKACarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // Kick off the Kafka consumer + kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " + + kafkaConsumerProperties.getConsumerTopicList()); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // Kick off the Kafka consumer + kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " + + kafkaConsumerProperties.getConsumerTopicList()); + } + + // The endless loop that receives events over Kafka + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + final ConsumerRecords<String, String> records = + kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollTime()); + for (final ConsumerRecord<String, String> record : records) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, record.key(), record.value()); + } + eventReceiver.receiveEvent(record.value()); + } + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); + } + } + + if (!consumerThread.isInterrupted()) { + kafkaConsumer.close(); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#stop() + */ + @Override + public void stop() { + stopOrderedFlag = true; + + while (consumerThread.isAlive()) { + ThreadUtilities.sleep(kafkaConsumerProperties.getConsumerPollTime()); + } + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java new file mode 100644 index 000000000..fb851bc70 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -0,0 +1,151 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; + +import java.util.EnumMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events using Kafka. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexKafkaProducer implements ApexEventProducer { + + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); + + // The Kafka parameters read from the parameter service + private KAFKACarrierTechnologyParameters kafkaProducerProperties; + + // The Kafka Producer used to send events using Kafka + private Producer<String, Object> kafkaProducer; + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>(EventHandlerPeeredMode.class); + + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the Kafka Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { + LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + throw new ApexEventException( + "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + } + kafkaProducerProperties = + (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap.policy.apex.service. + * parameters.eventhandler.EventHandlerPeeredMode, org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#sendEvent(long, java.lang.String, + * java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object event) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Kafka producer must be started in the same thread as it is stopped, so we must start it here + if (kafkaProducer == null) { + // Kick off the Kafka producer + kafkaProducer = new KafkaProducer<String, Object>(kafkaProducerProperties.getKafkaProducerProperties()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("event producer " + this.name + " is ready to send to topics: " + + kafkaProducerProperties.getProducerTopic()); + } + } + + kafkaProducer.send(new ProducerRecord<String, Object>(kafkaProducerProperties.getProducerTopic(), name, event)); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event sent from engine using {} to topic {} : {} ", this.name, + kafkaProducerProperties.getProducerTopic(), event); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + if (kafkaProducer != null) { + kafkaProducer.flush(); + kafkaProducer.close(); + } + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java new file mode 100644 index 000000000..2357b1807 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java @@ -0,0 +1,396 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * Apex parameters for Kafka as an event carrier technology. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA"; + + /** The producer plugin class for the Kafka carrier technology. */ + public static final String KAFKA_EVENT_PRODUCER_PLUGIN_CLASS = ApexKafkaProducer.class.getCanonicalName(); + + /** The consumer plugin class for the Kafka carrier technology. */ + public static final String KAFKA_EVENT_CONSUMER_PLUGIN_CLASS = ApexKafkaConsumer.class.getCanonicalName(); + + // Default parameter values + private static final String DEFAULT_ACKS = "all"; + private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; + private static final int DEFAULT_RETRIES = 0; + private static final int DEFAULT_BATCH_SIZE = 16384; + private static final int DEFAULT_LINGER_TIME = 1; + private static final long DEFAULT_BUFFER_MEMORY = 33554432; + private static final String DEFAULT_GROUP_ID = "default-group-id"; + private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true; + private static final int DEFAULT_AUTO_COMMIT_TIME = 1000; + private static final int DEFAULT_SESSION_TIMEOUT = 30000; + private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; + private static final int DEFAULT_CONSUMER_POLL_TIME = 100; + private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"}; + private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + + // Parameter property map tokens + private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; + private static final String PROPERTY_ACKS = "acks"; + private static final String PROPERTY_RETRIES = "retries"; + private static final String PROPERTY_BATCH_SIZE = "batch.size"; + private static final String PROPERTY_LINGER_TIME = "linger.ms"; + private static final String PROPERTY_BUFFER_MEMORY = "buffer.memory"; + private static final String PROPERTY_GROUP_ID = "group.id"; + private static final String PROPERTY_ENABLE_AUTO_COMMIT = "enable.auto.commit"; + private static final String PROPERTY_AUTO_COMMIT_TIME = "auto.commit.interval.ms"; + private static final String PROPERTY_SESSION_TIMEOUT = "session.timeout.ms"; + private static final String PROPERTY_KEY_SERIALIZER = "key.serializer"; + private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer"; + private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer"; + private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; + + // kafka carrier parameters + private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; + private String acks = DEFAULT_ACKS; + private int retries = DEFAULT_RETRIES; + private int batchSize = DEFAULT_BATCH_SIZE; + private int lingerTime = DEFAULT_LINGER_TIME; + private long bufferMemory = DEFAULT_BUFFER_MEMORY; + private String groupId = DEFAULT_GROUP_ID; + private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT; + private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME; + private int sessionTimeout = DEFAULT_SESSION_TIMEOUT; + private String producerTopic = DEFAULT_PRODUCER_TOPIC; + private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME; + private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST; + private String keySerializer = DEFAULT_KEY_SERIALIZER; + private String valueSerializer = DEFAULT_VALUE_SERIALIZER; + private String keyDeserializer = DEFAULT_KEY_DESERIALIZER; + private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER; + // @formatter:on + + /** + * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter + * service. + */ + public KAFKACarrierTechnologyParameters() { + super(KAFKACarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the kafka carrier technology + this.setLabel(KAFKA_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(KAFKA_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(KAFKA_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the kafka producer properties. + * + * @return the kafka producer properties + */ + public Properties getKafkaProducerProperties() { + final Properties kafkaProperties = new Properties(); + + kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + kafkaProperties.put(PROPERTY_ACKS, acks); + kafkaProperties.put(PROPERTY_RETRIES, retries); + kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); + kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); + kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); + kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); + kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); + + return kafkaProperties; + } + + /** + * Gets the kafka consumer properties. + * + * @return the kafka consumer properties + */ + public Properties getKafkaConsumerProperties() { + final Properties kafkaProperties = new Properties(); + + kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + kafkaProperties.put(PROPERTY_GROUP_ID, groupId); + kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); + kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); + kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); + kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); + kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); + + return kafkaProperties; + } + + /** + * Gets the bootstrap servers. + * + * @return the bootstrap servers + */ + public String getBootstrapServers() { + return bootstrapServers; + } + + /** + * Gets the acks. + * + * @return the acks + */ + public String getAcks() { + return acks; + } + + /** + * Gets the retries. + * + * @return the retries + */ + public int getRetries() { + return retries; + } + + /** + * Gets the batch size. + * + * @return the batch size + */ + public int getBatchSize() { + return batchSize; + } + + /** + * Gets the linger time. + * + * @return the linger time + */ + public int getLingerTime() { + return lingerTime; + } + + /** + * Gets the buffer memory. + * + * @return the buffer memory + */ + public long getBufferMemory() { + return bufferMemory; + } + + /** + * Gets the group id. + * + * @return the group id + */ + public String getGroupId() { + return groupId; + } + + /** + * Checks if is enable auto commit. + * + * @return true, if checks if is enable auto commit + */ + public boolean isEnableAutoCommit() { + return enableAutoCommit; + } + + /** + * Gets the auto commit time. + * + * @return the auto commit time + */ + public int getAutoCommitTime() { + return autoCommitTime; + } + + /** + * Gets the session timeout. + * + * @return the session timeout + */ + public int getSessionTimeout() { + return sessionTimeout; + } + + /** + * Gets the producer topic. + * + * @return the producer topic + */ + public String getProducerTopic() { + return producerTopic; + } + + /** + * Gets the consumer poll time. + * + * @return the consumer poll time + */ + public long getConsumerPollTime() { + return consumerPollTime; + } + + /** + * Gets the consumer topic list. + * + * @return the consumer topic list + */ + public Collection<String> getConsumerTopicList() { + return Arrays.asList(consumerTopicList); + } + + /** + * Gets the key serializer. + * + * @return the key serializer + */ + public String getKeySerializer() { + return keySerializer; + } + + /** + * Gets the value serializer. + * + * @return the value serializer + */ + public String getValueSerializer() { + return valueSerializer; + } + + /** + * Gets the key deserializer. + * + * @return the key deserializer + */ + public String getKeyDeserializer() { + return keyDeserializer; + } + + /** + * Gets the value deserializer. + * + * @return the value deserializer + */ + public String getValueDeserializer() { + return valueDeserializer; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + final StringBuilder errorMessageBuilder = new StringBuilder(); + + errorMessageBuilder.append(super.validate()); + + if (bootstrapServers == null || bootstrapServers.trim().length() == 0) { + errorMessageBuilder + .append(" bootstrapServers not specified, must be specified as a string of form host:port\n"); + } + + if (acks == null || acks.trim().length() == 0) { + errorMessageBuilder.append(" acks not specified, must be specified as a string with values [0|1|all]\n"); + } + + if (retries < 0) { + errorMessageBuilder.append(" retries [" + retries + "] invalid, must be specified as retries >= 0\n"); + } + + if (batchSize < 0) { + errorMessageBuilder + .append(" batchSize [" + batchSize + "] invalid, must be specified as batchSize >= 0\n"); + } + + if (lingerTime < 0) { + errorMessageBuilder + .append(" lingerTime [" + lingerTime + "] invalid, must be specified as lingerTime >= 0\n"); + } + + if (bufferMemory < 0) { + errorMessageBuilder + .append(" bufferMemory [" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0\n"); + } + + if (groupId == null || groupId.trim().length() == 0) { + errorMessageBuilder.append(" groupId not specified, must be specified as a string\n"); + } + + if (autoCommitTime < 0) { + errorMessageBuilder.append( + " autoCommitTime [" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0\n"); + } + + if (sessionTimeout < 0) { + errorMessageBuilder.append( + " sessionTimeout [" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0\n"); + } + + if (producerTopic == null || producerTopic.trim().length() == 0) { + errorMessageBuilder.append(" producerTopic not specified, must be specified as a string\n"); + } + + if (consumerPollTime < 0) { + errorMessageBuilder.append(" consumerPollTime [" + consumerPollTime + + "] invalid, must be specified as consumerPollTime >= 0\n"); + } + + if (consumerTopicList == null || consumerTopicList.length == 0) { + errorMessageBuilder.append(" consumerTopicList not specified, must be specified as a list of strings\n"); + } + + for (final String consumerTopic : consumerTopicList) { + if (consumerTopic == null || consumerTopic.trim().length() == 0) { + errorMessageBuilder.append(" invalid consumer topic \"" + consumerTopic + + "\" specified on consumerTopicList, consumer topics must be specified as strings\n"); + } + } + + if (keySerializer == null || keySerializer.trim().length() == 0) { + errorMessageBuilder.append(" keySerializer not specified, must be specified as a string\n"); + } + + if (valueSerializer == null || valueSerializer.trim().length() == 0) { + errorMessageBuilder.append(" valueSerializer not specified, must be specified as a string\n"); + } + + if (keyDeserializer == null || keyDeserializer.trim().length() == 0) { + errorMessageBuilder.append(" keyDeserializer not specified, must be specified as a string\n"); + } + + if (valueDeserializer == null || valueDeserializer.trim().length() == 0) { + errorMessageBuilder.append(" valueDeserializer not specified, must be specified as a string\n"); + } + + return errorMessageBuilder.toString(); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java new file mode 100644 index 000000000..f04aaceef --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the APEX event carrier technology plugin for Kafka. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ + +package org.onap.policy.apex.plugins.event.carrier.kafka; |