aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java10
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java13
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java (renamed from plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java)60
3 files changed, 42 insertions, 41 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 3351a58e9..dfb12617c 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -47,7 +47,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class);
// The Kafka parameters read from the parameter service
- private KAFKACarrierTechnologyParameters kafkaConsumerProperties;
+ private KafkaCarrierTechnologyParameters kafkaConsumerProperties;
// The event receiver that will receive events from this consumer
private ApexEventReceiver eventReceiver;
@@ -79,7 +79,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
this.name = consumerName;
// Check and get the Kafka Properties
- if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) {
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
LOGGER.warn("specified consumer properties of type \""
+ consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName()
+ "\" are not applicable to a Kafka consumer");
@@ -88,10 +88,10 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
+ "\" are not applicable to a Kafka consumer");
}
kafkaConsumerProperties =
- (KAFKACarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
+ (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters();
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties());
+ kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
@@ -153,7 +153,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
@Override
public void run() {
// Kick off the Kafka consumer
- kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties());
+ kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
index fb851bc70..c83c0ae1e 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java
@@ -46,7 +46,7 @@ public class ApexKafkaProducer implements ApexEventProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class);
// The Kafka parameters read from the parameter service
- private KAFKACarrierTechnologyParameters kafkaProducerProperties;
+ private KafkaCarrierTechnologyParameters kafkaProducerProperties;
// The Kafka Producer used to send events using Kafka
private Producer<String, Object> kafkaProducer;
@@ -63,13 +63,14 @@ public class ApexKafkaProducer implements ApexEventProducer {
this.name = producerName;
// Check and get the Kafka Properties
- if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) {
- LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")");
+ if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) {
+ String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")";
+ LOGGER.warn(message);
throw new ApexEventException(
- "specified producer properties are not applicable to a Kafka producer (" + this.name + ")");
+ message);
}
kafkaProducerProperties =
- (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
+ (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
}
/*
@@ -122,7 +123,7 @@ public class ApexKafkaProducer implements ApexEventProducer {
// Kafka producer must be started in the same thread as it is stopped, so we must start it here
if (kafkaProducer == null) {
// Kick off the Kafka producer
- kafkaProducer = new KafkaProducer<String, Object>(kafkaProducerProperties.getKafkaProducerProperties());
+ kafkaProducer = new KafkaProducer<>(kafkaProducerProperties.getKafkaProducerProperties());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event producer " + this.name + " is ready to send to topics: "
+ kafkaProducerProperties.getProducerTopic());
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 5ce96662e..9d7cc77f3 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -33,7 +33,7 @@ import org.onap.policy.common.parameters.ValidationStatus;
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
-public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters {
+public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
// @formatter:off
/** The label of this carrier technology. */
public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA";
@@ -48,23 +48,23 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
// Default parameter values
- private static final String DEFAULT_ACKS = "all";
- private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
- private static final int DEFAULT_RETRIES = 0;
- private static final int DEFAULT_BATCH_SIZE = 16384;
- private static final int DEFAULT_LINGER_TIME = 1;
- private static final long DEFAULT_BUFFER_MEMORY = 33554432;
- private static final String DEFAULT_GROUP_ID = "default-group-id";
- private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true;
- private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
- private static final int DEFAULT_SESSION_TIMEOUT = 30000;
- private static final String DEFAULT_PRODUCER_TOPIC = "apex-out";
- private static final int DEFAULT_CONSUMER_POLL_TIME = 100;
- private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"};
- private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
- private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
- private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_ACKS = "all";
+ private static final String DEFAULT_BOOT_SERVERS = "localhost:9092";
+ private static final int DEFAULT_RETRIES = 0;
+ private static final int DEFAULT_BATCH_SIZE = 16384;
+ private static final int DEFAULT_LINGER_TIME = 1;
+ private static final long DEFAULT_BUFFER_MEMORY = 33554432;
+ private static final String DEFAULT_GROUP_ID = "default-group-id";
+ private static final boolean DEFAULT_ENABLE_AUTOCMIT = true;
+ private static final int DEFAULT_AUTO_COMMIT_TIME = 1000;
+ private static final int DEFAULT_SESSION_TIMEOUT = 30000;
+ private static final String DEFAULT_PROD_TOPIC = "apex-out";
+ private static final int DEFAULT_CONS_POLL_TIME = 100;
+ private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"};
+ private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
// Parameter property map tokens
private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
@@ -83,30 +83,30 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
// kafka carrier parameters
- private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
+ private String bootstrapServers = DEFAULT_BOOT_SERVERS;
private String acks = DEFAULT_ACKS;
private int retries = DEFAULT_RETRIES;
private int batchSize = DEFAULT_BATCH_SIZE;
private int lingerTime = DEFAULT_LINGER_TIME;
private long bufferMemory = DEFAULT_BUFFER_MEMORY;
private String groupId = DEFAULT_GROUP_ID;
- private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT;
+ private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT;
private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
- private String producerTopic = DEFAULT_PRODUCER_TOPIC;
- private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME;
- private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST;
- private String keySerializer = DEFAULT_KEY_SERIALIZER;
- private String valueSerializer = DEFAULT_VALUE_SERIALIZER;
- private String keyDeserializer = DEFAULT_KEY_DESERIALIZER;
- private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER;
+ private String producerTopic = DEFAULT_PROD_TOPIC;
+ private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
+ private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
+ private String keySerializer = DEFAULT_KEY_SERZER;
+ private String valueSerializer = DEFAULT_VAL_SERZER;
+ private String keyDeserializer = DEFAULT_KEY_DESZER;
+ private String valueDeserializer = DEFAULT_VALUE_DESZER;
// @formatter:on
/**
* Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter
* service.
*/
- public KAFKACarrierTechnologyParameters() {
+ public KafkaCarrierTechnologyParameters() {
super();
// Set the carrier technology properties for the kafka carrier technology
@@ -394,7 +394,7 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter
return result;
}
-
+
private void validateConsumerTopicList(final GroupValidationResult result) {
if (consumerTopicList == null || consumerTopicList.length == 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
@@ -414,6 +414,6 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter
}
private boolean isNullOrBlank(final String stringValue) {
- return stringValue == null || stringValue.trim().length() == 0;
+ return stringValue == null || stringValue.trim().length() == 0;
}
}