aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml13
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java12
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java28
3 files changed, 47 insertions, 6 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
index 85cbe0f6e..0423db448 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/pom.xml
@@ -1,6 +1,7 @@
<!--
============LICENSE_START=======================================================
Copyright (C) 2018 Ericsson. All rights reserved.
+ Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -35,5 +36,17 @@
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-avro-serializer</artifactId>
+ <version>7.2.1</version>
+ </dependency>
</dependencies>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
</project>
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index 95379d457..55d46b522 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2016-2018 Ericsson. All rights reserved.
* Modifications Copyright (C) 2019-2020 Nordix Foundation.
- * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -75,7 +75,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
@Override
public void run() {
// Kick off the Kafka consumer
- try (KafkaConsumer<String, String> kafkaConsumer =
+ try (KafkaConsumer<String, Object> kafkaConsumer =
new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties())) {
kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
@@ -86,11 +86,11 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
// The endless loop that receives events over Kafka
while (consumerThread.isAlive() && !stopOrderedFlag) {
try {
- final ConsumerRecords<String, String> records =
+ final ConsumerRecords<String, Object> records =
kafkaConsumer.poll(kafkaConsumerProperties.getConsumerPollDuration());
- for (final ConsumerRecord<String, String> dataRecord : records) {
+ for (final ConsumerRecord<String, Object> dataRecord : records) {
traceIfTraceEnabled(dataRecord);
- eventReceiver.receiveEvent(new Properties(), dataRecord.value());
+ eventReceiver.receiveEvent(new Properties(), dataRecord.value().toString());
}
} catch (final Exception e) {
LOGGER.debug("error receiving events on thread {}", consumerThread.getName(), e);
@@ -104,7 +104,7 @@ public class ApexKafkaConsumer extends ApexPluginsEventConsumer {
*
* @param dataRecord the record to trace
*/
- private void traceIfTraceEnabled(final ConsumerRecord<String, String> dataRecord) {
+ private void traceIfTraceEnabled(final ConsumerRecord<String, Object> dataRecord) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("event received for {} for forwarding to Apex engine : {} {}",
this.getClass().getName() + ":" + this.name, dataRecord.key(), dataRecord.value());
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
index edb4a1a7b..1ef3550e4 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
@@ -2,6 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
+ * Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,9 +39,12 @@ import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMo
public class ApexKafkaConsumerTest {
ApexKafkaConsumer apexKafkaConsumer = null;
+ ApexKafkaConsumer apexKafkaConsumer2 = null;
EventHandlerParameters consumerParameters = null;
+ EventHandlerParameters consumerParameters2 = null;
ApexEventReceiver incomingEventReceiver = null;
ApexEventProducer apexKafkaProducer = null;
+ KafkaCarrierTechnologyParameters kafkaParameters = null;
/**
* Set up testing.
@@ -55,21 +59,37 @@ public class ApexKafkaConsumerTest {
consumerParameters
.setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+
+ apexKafkaConsumer2 = new ApexKafkaConsumer();
+ consumerParameters2 = new EventHandlerParameters();
+ kafkaParameters = new KafkaCarrierTechnologyParameters();
+ String[][] kafkaProperties = {
+ {"value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"},
+ {"schema.registry.url", "[http://test-registory:8080]"}
+ };
+ kafkaParameters.setKafkaProperties(kafkaProperties);
+
+ consumerParameters2
+ .setCarrierTechnologyParameters(kafkaParameters);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters2, incomingEventReceiver);
}
@Test
public void testStart() {
assertThatCode(apexKafkaConsumer::start).doesNotThrowAnyException();
+ assertThatCode(apexKafkaConsumer2::start).doesNotThrowAnyException();
}
@Test
public void testGetName() {
assertEquals("TestApexKafkaConsumer", apexKafkaConsumer.getName());
+ assertEquals("TestApexKafkaConsumer2", apexKafkaConsumer2.getName());
}
@Test
public void testGetPeeredReference() {
assertNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+ assertNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test
@@ -78,22 +98,30 @@ public class ApexKafkaConsumerTest {
apexKafkaConsumer, apexKafkaProducer);
apexKafkaConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference);
assertNotNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+
+ PeeredReference peeredReference2 = new PeeredReference(EventHandlerPeeredMode.REQUESTOR,
+ apexKafkaConsumer2, apexKafkaProducer);
+ apexKafkaConsumer2.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference2);
+ assertNotNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test(expected = java.lang.NullPointerException.class)
public void testRun() {
apexKafkaConsumer.run();
+ apexKafkaConsumer2.run();
}
@Test(expected = java.lang.NullPointerException.class)
public void testStop() {
apexKafkaConsumer.stop();
+ apexKafkaConsumer2.stop();
}
@Test(expected = ApexEventException.class)
public void testInitWithNonKafkaCarrierTechnologyParameters() throws ApexEventException {
consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver);
}
}