diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
2 files changed, 59 insertions, 25 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 94ac2bcca..affd10ccb 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -166,10 +166,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { final ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration().toMillis()); for (final ConsumerRecord<String, String> record : records) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); - } + traceIfTraceEnabled(record); eventReceiver.receiveEvent(record.value()); } } catch (final Exception e) { @@ -182,6 +179,18 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { } } + /** + * Trace a record if trace is enabled. + * + * @param record the record to trace + */ + private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", + this.getClass().getName() + ":" + this.name, record.key(), record.value()); + } + } + /* * (non-Javadoc) * diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 7c24ce1aa..851741611 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -62,10 +62,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String DEFAULT_PROD_TOPIC = "apex-out"; private static final int DEFAULT_CONS_POLL_TIME = 100; private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; - private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -97,10 +95,10 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private String producerTopic = DEFAULT_PROD_TOPIC; private int consumerPollTime = DEFAULT_CONS_POLL_TIME; private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; - private String keySerializer = DEFAULT_KEY_SERZER; - private String valueSerializer = DEFAULT_VAL_SERZER; - private String keyDeserializer = DEFAULT_KEY_DESZER; - private String valueDeserializer = DEFAULT_VALUE_DESZER; + private String keySerializer = DEFAULT_STRING_SERZER; + private String valueSerializer = DEFAULT_STRING_SERZER; + private String keyDeserializer = DEFAULT_STRING_DESZER; + private String valueDeserializer = DEFAULT_STRING_DESZER; // @formatter:on /** @@ -325,6 +323,23 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); + validateStringParameters(result); + + validateNumericParameters(result); + + validateConsumerTopicList(result); + + validateSerializersAndDeserializers(result); + + return result; + } + + /** + * Validate that string parameters are correct. + * + * @param result the result of the validation + */ + private void validateStringParameters(final GroupValidationResult result) { if (isNullOrBlank(bootstrapServers)) { result.setResult("bootstrapServers", ValidationStatus.INVALID, "not specified, must be specified as a string of form host:port"); @@ -335,6 +350,22 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "not specified, must be specified as a string with values [0|1|all]"); } + if (isNullOrBlank(groupId)) { + result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); + } + + if (isNullOrBlank(producerTopic)) { + result.setResult("producerTopic", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } + } + + /** + * Check if numeric parameters are valid. + * + * @param result the result of the validation + */ + private void validateNumericParameters(final GroupValidationResult result) { if (retries < 0) { result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, "[" + retries + "] invalid, must be specified as retries >= 0"); @@ -355,10 +386,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); } - if (isNullOrBlank(groupId)) { - result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); - } - if (autoCommitTime < 0) { result.setResult("autoCommitTime", ValidationStatus.INVALID, "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); @@ -369,18 +396,18 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); } - if (isNullOrBlank(producerTopic)) { - result.setResult("producerTopic", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - if (consumerPollTime < 0) { result.setResult("consumerPollTime", ValidationStatus.INVALID, "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); } + } - validateConsumerTopicList(result); - + /** + * Validate the serializers and deserializers. + * + * @param result the result of the validation. + */ + private void validateSerializersAndDeserializers(final GroupValidationResult result) { if (isNullOrBlank(keySerializer)) { result.setResult("keySerializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); @@ -400,8 +427,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter result.setResult("valueDeserializer", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); } - - return result; } private void validateConsumerTopicList(final GroupValidationResult result) { |