diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java | 10 | ||||
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java | 13 | ||||
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java (renamed from plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java) | 60 |
3 files changed, 42 insertions, 41 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index 3351a58e9..dfb12617c 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -47,7 +47,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaConsumer.class); // The Kafka parameters read from the parameter service - private KAFKACarrierTechnologyParameters kafkaConsumerProperties; + private KafkaCarrierTechnologyParameters kafkaConsumerProperties; // The event receiver that will receive events from this consumer private ApexEventReceiver eventReceiver; @@ -79,7 +79,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { this.name = consumerName; // Check and get the Kafka Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { LOGGER.warn("specified consumer properties of type \"" + consumerParameters.getCarrierTechnologyParameters().getClass().getCanonicalName() + "\" are not applicable to a Kafka consumer"); @@ -88,10 +88,10 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { + "\" are not applicable to a Kafka consumer"); } kafkaConsumerProperties = - (KAFKACarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + (KafkaCarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " @@ -153,7 +153,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { @Override public void run() { // Kick off the Kafka consumer - kafkaConsumer = new KafkaConsumer<String, String>(kafkaConsumerProperties.getKafkaConsumerProperties()); + kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java index fb851bc70..c83c0ae1e 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducer.java @@ -46,7 +46,7 @@ public class ApexKafkaProducer implements ApexEventProducer { private static final Logger LOGGER = LoggerFactory.getLogger(ApexKafkaProducer.class); // The Kafka parameters read from the parameter service - private KAFKACarrierTechnologyParameters kafkaProducerProperties; + private KafkaCarrierTechnologyParameters kafkaProducerProperties; // The Kafka Producer used to send events using Kafka private Producer<String, Object> kafkaProducer; @@ -63,13 +63,14 @@ public class ApexKafkaProducer implements ApexEventProducer { this.name = producerName; // Check and get the Kafka Properties - if (!(producerParameters.getCarrierTechnologyParameters() instanceof KAFKACarrierTechnologyParameters)) { - LOGGER.warn("specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + if (!(producerParameters.getCarrierTechnologyParameters() instanceof KafkaCarrierTechnologyParameters)) { + String message = "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"; + LOGGER.warn(message); throw new ApexEventException( - "specified producer properties are not applicable to a Kafka producer (" + this.name + ")"); + message); } kafkaProducerProperties = - (KAFKACarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + (KafkaCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); } /* @@ -122,7 +123,7 @@ public class ApexKafkaProducer implements ApexEventProducer { // Kafka producer must be started in the same thread as it is stopped, so we must start it here if (kafkaProducer == null) { // Kick off the Kafka producer - kafkaProducer = new KafkaProducer<String, Object>(kafkaProducerProperties.getKafkaProducerProperties()); + kafkaProducer = new KafkaProducer<>(kafkaProducerProperties.getKafkaProducerProperties()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event producer " + this.name + " is ready to send to topics: " + kafkaProducerProperties.getProducerTopic()); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 5ce96662e..9d7cc77f3 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KAFKACarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -33,7 +33,7 @@ import org.onap.policy.common.parameters.ValidationStatus; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameters { +public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters { // @formatter:off /** The label of this carrier technology. */ public static final String KAFKA_CARRIER_TECHNOLOGY_LABEL = "KAFKA"; @@ -48,23 +48,23 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; // Default parameter values - private static final String DEFAULT_ACKS = "all"; - private static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092"; - private static final int DEFAULT_RETRIES = 0; - private static final int DEFAULT_BATCH_SIZE = 16384; - private static final int DEFAULT_LINGER_TIME = 1; - private static final long DEFAULT_BUFFER_MEMORY = 33554432; - private static final String DEFAULT_GROUP_ID = "default-group-id"; - private static final boolean DEFAULT_ENABLE_AUTO_COMMIT = true; - private static final int DEFAULT_AUTO_COMMIT_TIME = 1000; - private static final int DEFAULT_SESSION_TIMEOUT = 30000; - private static final String DEFAULT_PRODUCER_TOPIC = "apex-out"; - private static final int DEFAULT_CONSUMER_POLL_TIME = 100; - private static final String[] DEFAULT_CONSUMER_TOPIC_LIST = {"apex-in"}; - private static final String DEFAULT_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; - private static final String DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; - private static final String DEFAULT_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_ACKS = "all"; + private static final String DEFAULT_BOOT_SERVERS = "localhost:9092"; + private static final int DEFAULT_RETRIES = 0; + private static final int DEFAULT_BATCH_SIZE = 16384; + private static final int DEFAULT_LINGER_TIME = 1; + private static final long DEFAULT_BUFFER_MEMORY = 33554432; + private static final String DEFAULT_GROUP_ID = "default-group-id"; + private static final boolean DEFAULT_ENABLE_AUTOCMIT = true; + private static final int DEFAULT_AUTO_COMMIT_TIME = 1000; + private static final int DEFAULT_SESSION_TIMEOUT = 30000; + private static final String DEFAULT_PROD_TOPIC = "apex-out"; + private static final int DEFAULT_CONS_POLL_TIME = 100; + private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; + private static final String DEFAULT_KEY_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_VAL_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; + private static final String DEFAULT_KEY_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_VALUE_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -83,30 +83,30 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; // kafka carrier parameters - private String bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS; + private String bootstrapServers = DEFAULT_BOOT_SERVERS; private String acks = DEFAULT_ACKS; private int retries = DEFAULT_RETRIES; private int batchSize = DEFAULT_BATCH_SIZE; private int lingerTime = DEFAULT_LINGER_TIME; private long bufferMemory = DEFAULT_BUFFER_MEMORY; private String groupId = DEFAULT_GROUP_ID; - private boolean enableAutoCommit = DEFAULT_ENABLE_AUTO_COMMIT; + private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT; private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME; private int sessionTimeout = DEFAULT_SESSION_TIMEOUT; - private String producerTopic = DEFAULT_PRODUCER_TOPIC; - private int consumerPollTime = DEFAULT_CONSUMER_POLL_TIME; - private String[] consumerTopicList = DEFAULT_CONSUMER_TOPIC_LIST; - private String keySerializer = DEFAULT_KEY_SERIALIZER; - private String valueSerializer = DEFAULT_VALUE_SERIALIZER; - private String keyDeserializer = DEFAULT_KEY_DESERIALIZER; - private String valueDeserializer = DEFAULT_VALUE_DESERIALIZER; + private String producerTopic = DEFAULT_PROD_TOPIC; + private int consumerPollTime = DEFAULT_CONS_POLL_TIME; + private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; + private String keySerializer = DEFAULT_KEY_SERZER; + private String valueSerializer = DEFAULT_VAL_SERZER; + private String keyDeserializer = DEFAULT_KEY_DESZER; + private String valueDeserializer = DEFAULT_VALUE_DESZER; // @formatter:on /** * Constructor to create a kafka carrier technology parameters instance and register the instance with the parameter * service. */ - public KAFKACarrierTechnologyParameters() { + public KafkaCarrierTechnologyParameters() { super(); // Set the carrier technology properties for the kafka carrier technology @@ -394,7 +394,7 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter return result; } - + private void validateConsumerTopicList(final GroupValidationResult result) { if (consumerTopicList == null || consumerTopicList.length == 0) { result.setResult("consumerTopicList", ValidationStatus.INVALID, @@ -414,6 +414,6 @@ public class KAFKACarrierTechnologyParameters extends CarrierTechnologyParameter } private boolean isNullOrBlank(final String stringValue) { - return stringValue == null || stringValue.trim().length() == 0; + return stringValue == null || stringValue.trim().length() == 0; } } |