summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java17
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java67
2 files changed, 59 insertions, 25 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 94ac2bcca..affd10ccb 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -166,10 +166,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
final ConsumerRecords<String, String> records =
kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis());
for (final ConsumerRecord<String, String> record : records) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
- this.getClass().getName() + ":" + this.name, record.key(), record.value());
- }
+ traceIfTraceEnabled(record);
eventReceiver.receiveEvent(record.value());
}
} catch (final Exception e) {
@@ -182,6 +179,18 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
}
}
+ /**
+ * Trace a record if trace is enabled.
+ *
+ * @param record the record to trace
+ */
+ private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
+ this.getClass().getName() + ":" + this.name, record.key(), record.value());
+ }
+ }
+
/*
* (non-Javadoc)
*
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 7c24ce1aa..851741611 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -62,10 +62,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String DEFAULT_PROD_TOPIC = "apex-out";
private static final int DEFAULT_CONS_POLL_TIME = 100;
private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"};
- private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
- private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
- private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
- private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
+ private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
// Parameter property map tokens
private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
@@ -97,10 +95,10 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private String producerTopic = DEFAULT_PROD_TOPIC;
private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
- private String keySerializer = DEFAULT_KEY_SERZER;
- private String valueSerializer = DEFAULT_VAL_SERZER;
- private String keyDeserializer = DEFAULT_KEY_DESZER;
- private String valueDeserializer = DEFAULT_VALUE_DESZER;
+ private String keySerializer = DEFAULT_STRING_SERZER;
+ private String valueSerializer = DEFAULT_STRING_SERZER;
+ private String keyDeserializer = DEFAULT_STRING_DESZER;
+ private String valueDeserializer = DEFAULT_STRING_DESZER;
// @formatter:on
/**
@@ -325,6 +323,23 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
public GroupValidationResult validate() {
final GroupValidationResult result = super.validate();
+ validateStringParameters(result);
+
+ validateNumericParameters(result);
+
+ validateConsumerTopicList(result);
+
+ validateSerializersAndDeserializers(result);
+
+ return result;
+ }
+
+ /**
+ * Validate that string parameters are correct.
+ *
+ * @param result the result of the validation
+ */
+ private void validateStringParameters(final GroupValidationResult result) {
if (isNullOrBlank(bootstrapServers)) {
result.setResult("bootstrapServers", ValidationStatus.INVALID,
"not specified, must be specified as a string of form host:port");
@@ -335,6 +350,22 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
"not specified, must be specified as a string with values [0|1|all]");
}
+ if (isNullOrBlank(groupId)) {
+ result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
+ }
+
+ if (isNullOrBlank(producerTopic)) {
+ result.setResult("producerTopic", ValidationStatus.INVALID,
+ SPECIFY_AS_STRING_MESSAGE);
+ }
+ }
+
+ /**
+ * Check if numeric parameters are valid.
+ *
+ * @param result the result of the validation
+ */
+ private void validateNumericParameters(final GroupValidationResult result) {
if (retries < 0) {
result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
"[" + retries + "] invalid, must be specified as retries >= 0");
@@ -355,10 +386,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
"[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
}
- if (isNullOrBlank(groupId)) {
- result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
- }
-
if (autoCommitTime < 0) {
result.setResult("autoCommitTime", ValidationStatus.INVALID,
"[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
@@ -369,18 +396,18 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
"[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
}
- if (isNullOrBlank(producerTopic)) {
- result.setResult("producerTopic", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
if (consumerPollTime < 0) {
result.setResult("consumerPollTime", ValidationStatus.INVALID,
"[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
}
+ }
- validateConsumerTopicList(result);
-
+ /**
+ * Validate the serializers and deserializers.
+ *
+ * @param result the result of the validation.
+ */
+ private void validateSerializersAndDeserializers(final GroupValidationResult result) {
if (isNullOrBlank(keySerializer)) {
result.setResult("keySerializer", ValidationStatus.INVALID,
SPECIFY_AS_STRING_MESSAGE);
@@ -400,8 +427,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
result.setResult("valueDeserializer", ValidationStatus.INVALID,
SPECIFY_AS_STRING_MESSAGE);
}
-
- return result;
}
private void validateConsumerTopicList(final GroupValidationResult result) {