summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRam Krishna Verma <ram_krishna.verma@bell.ca>2022-10-14 10:05:29 -0400
committerRam Krishna Verma <ram_krishna.verma@bell.ca>2022-10-17 14:23:21 +0000
commit2ece05337011d7b2d40e97858ccd3627662bc525 (patch)
treedada8eeb41c025cd41895160a73961ab8a03553d
parent793e0cd5a36e17712422fde15b3ade040dbcc419 (diff)
Add support for KafkaAvroSerializer in apex-pdp
Adding the support for KafkaAvroSerializer to deserialize the messages sent on a kafka topic using the KafkaAvroSerializer. The default StringDeserializer that comes from KafkaConsumer is not able to work with avro encoded messages. Issue-ID: POLICY-4369 Change-Id: Ia97bee9546baa78c237e21a220df9374b84121dd Signed-off-by: Ram Krishna Verma <ram_krishna.verma@bell.ca> (cherry picked from commit b1296de01e61cb8d8e2a8476bad108b2046783de)
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml13
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java12
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java28
3 files changed, 47 insertions, 6 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
index 341b76dfd..33fdfff68 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
@@ -1,6 +1,7 @@
<!--
============LICENSE_START=======================================================
Copyright (C) 2018 Ericsson. All rights reserved.
+ Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -42,5 +43,17 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>7.2.1</version>
+ </dependency>
</dependencies>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
</project>
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 95379d457..55d46b522 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
- * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -75,7 +75,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
@Override
public void run() {
// Kick off the Kafka consumer
- try (KafkaConsumer<String, String> kafkaConsumer =
+ try (KafkaConsumer<String, Object> kafkaConsumer =
new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
@@ -86,11 +86,11 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The endless loop that receives events over Kafka
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
- final ConsumerRecords<String, String> records =
+ final ConsumerRecords<String, Object> records =
kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> dataRecord : records) {
+ for (final ConsumerRecord<String, Object> dataRecord : records) {
traceIfTraceEnabled(dataRecord);
- eventReceiver.receiveEvent(new Properties(), dataRecord.value());
+ eventReceiver.receiveEvent(new Properties(), dataRecord.value().toString());
}
} catch (final Exception e) {
LOGGER.debug("error receiving events on thread {}", consumerThread.getName(), e);
@@ -104,7 +104,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
*
* @param dataRecord the record to trace
*/
- private void traceIfTraceEnabled(final ConsumerRecord<String, String> dataRecord) {
+ private void traceIfTraceEnabled(final ConsumerRecord<String, Object> dataRecord) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
this.getClass().getName() + ":" + this.name, dataRecord.key(), dataRecord.value());
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
index edb4a1a7b..1ef3550e4 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
@@ -2,6 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
+ * Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,9 +39,12 @@ import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMo
public class ApexKafkaConsumerTest {
ApexKafkaConsumer apexKafkaConsumer = null;
+ ApexKafkaConsumer apexKafkaConsumer2 = null;
EventHandlerParameters consumerParameters = null;
+ EventHandlerParameters consumerParameters2 = null;
ApexEventReceiver incomingEventReceiver = null;
ApexEventProducer apexKafkaProducer = null;
+ KafkaCarrierTechnologyParameters kafkaParameters = null;
/**
* Set up testing.
@@ -55,21 +59,37 @@ public class ApexKafkaConsumerTest {
consumerParameters
.setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+
+ apexKafkaConsumer2 = new ApexKafkaConsumer();
+ consumerParameters2 = new EventHandlerParameters();
+ kafkaParameters = new KafkaCarrierTechnologyParameters();
+ String[][] kafkaProperties = {
+ {"value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"},
+ {"schema.registry.url", "[http://test-registory:8080]"}
+ };
+ kafkaParameters.setKafkaProperties(kafkaProperties);
+
+ consumerParameters2
+ .setCarrierTechnologyParameters(kafkaParameters);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters2, incomingEventReceiver);
}
@Test
public void testStart() {
assertThatCode(apexKafkaConsumer::start).doesNotThrowAnyException();
+ assertThatCode(apexKafkaConsumer2::start).doesNotThrowAnyException();
}
@Test
public void testGetName() {
assertEquals("TestApexKafkaConsumer", apexKafkaConsumer.getName());
+ assertEquals("TestApexKafkaConsumer2", apexKafkaConsumer2.getName());
}
@Test
public void testGetPeeredReference() {
assertNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+ assertNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test
@@ -78,22 +98,30 @@ public class ApexKafkaConsumerTest {
apexKafkaConsumer, apexKafkaProducer);
apexKafkaConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference);
assertNotNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+
+ PeeredReference peeredReference2 = new PeeredReference(EventHandlerPeeredMode.REQUESTOR,
+ apexKafkaConsumer2, apexKafkaProducer);
+ apexKafkaConsumer2.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference2);
+ assertNotNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test(expected = java.lang.NullPointerException.class)
public void testRun() {
apexKafkaConsumer.run();
+ apexKafkaConsumer2.run();
}
@Test(expected = java.lang.NullPointerException.class)
public void testStop() {
apexKafkaConsumer.stop();
+ apexKafkaConsumer2.stop();
}
@Test(expected = ApexEventException.class)
public void testInitWithNonKafkaCarrierTechnologyParameters() throws ApexEventException {
consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver);
}
}