diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
2 files changed, 16 insertions, 15 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 2957a1a11..95379d457 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -3,6 +3,7 @@ * Copyright (C) 2016-2018 Ericsson. All rights reserved. * Modifications Copyright (C) 2019-2020 Nordix Foundation. * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -87,9 +88,9 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { try { final ConsumerRecords<String, String> records = kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration()); - for (final ConsumerRecord<String, String> record : records) { - traceIfTraceEnabled(record); - eventReceiver.receiveEvent(new Properties(), record.value()); + for (final ConsumerRecord<String, String> dataRecord : records) { + traceIfTraceEnabled(dataRecord); + eventReceiver.receiveEvent(new Properties(), dataRecord.value()); } } catch (final Exception e) { LOGGER.debug("error receiving events on thread {}", consumerThread.getName(), e); @@ -101,12 +102,12 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer { /** * Trace a record if trace is enabled. * - * @param record the record to trace + * @param dataRecord the record to trace */ - private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) { + private void traceIfTraceEnabled(final ConsumerRecord<String, String> dataRecord) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}", - this.getClass().getName() + ":" + this.name, record.key(), record.value()); + this.getClass().getName() + ":" + this.name, dataRecord.key(), dataRecord.value()); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 475017283..a599307ff 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -158,11 +158,11 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka producer properties */ public Properties getKafkaProducerProperties() { - final Properties retKafkaProps = new Properties(); + final var retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { - for (int i = 0; i < kafkaProperties.length; i++) { + for (var i = 0; i < kafkaProperties.length; i++) { retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } @@ -188,11 +188,11 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka consumer properties */ public Properties getKafkaConsumerProperties() { - final Properties retKafkaProps = new Properties(); + final var retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { - for (int i = 0; i < kafkaProperties.length; i++) { + for (var i = 0; i < kafkaProperties.length; i++) { retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } @@ -250,8 +250,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter "not specified, must be specified as a list of strings"); } - BeanValidationResult result = new BeanValidationResult("consumerTopicList", consumerTopicList); - int item = 0; + var result = new BeanValidationResult("consumerTopicList", consumerTopicList); + var item = 0; for (final String consumerTopic : consumerTopicList) { if (StringUtils.isBlank(consumerTopic)) { result.addResult(ENTRY + item, consumerTopic, ValidationStatus.INVALID, Validated.IS_BLANK); @@ -272,13 +272,13 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter return null; } - BeanValidationResult result = new BeanValidationResult(KAFKA_PROPERTIES, kafkaProperties); + var result = new BeanValidationResult(KAFKA_PROPERTIES, kafkaProperties); - for (int i = 0; i < kafkaProperties.length; i++) { + for (var i = 0; i < kafkaProperties.length; i++) { final String label = ENTRY + i; final String[] kafkaProperty = kafkaProperties[i]; final List<String> value = (kafkaProperty == null ? null : Arrays.asList(kafkaProperty)); - final BeanValidationResult result2 = new BeanValidationResult(label, value); + final var result2 = new BeanValidationResult(label, value); if (kafkaProperty == null) { // note: add to result, not result2 |