summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java
diff options
context:
space:
mode:
authorRam Krishna Verma <ram_krishna.verma@bell.ca>2022-10-14 10:05:29 -0400
committerRam Krishna Verma <ram_krishna.verma@bell.ca>2022-10-14 10:05:34 -0400
commitb1296de01e61cb8d8e2a8476bad108b2046783de (patch)
treefd26c1445354c444111e1b4a7c4171b8a814e036 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java
parentb6fc8c57ab690c013cf4eb3f83afa899c9e08294 (diff)
Add support for KafkaAvroSerializer in apex-pdp
Adding the support for KafkaAvroSerializer to deserialize the messages sent on a kafka topic using the KafkaAvroSerializer. The default StringDeserializer that comes from KafkaConsumer is not able to work with avro encoded messages. Issue-ID: POLICY-4369 Change-Id: Ia97bee9546baa78c237e21a220df9374b84121dd Signed-off-by: Ram Krishna Verma <ram_krishna.verma@bell.ca>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java28
1 files changed, 28 insertions, 0 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
index edb4a1a7b..1ef3550e4 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
@@ -2,6 +2,7 @@
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
* Modifications Copyright (C) 2020 Nordix Foundation
+ * Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,9 +39,12 @@ import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMo
public class ApexKafkaConsumerTest {
ApexKafkaConsumer apexKafkaConsumer = null;
+ ApexKafkaConsumer apexKafkaConsumer2 = null;
EventHandlerParameters consumerParameters = null;
+ EventHandlerParameters consumerParameters2 = null;
ApexEventReceiver incomingEventReceiver = null;
ApexEventProducer apexKafkaProducer = null;
+ KafkaCarrierTechnologyParameters kafkaParameters = null;
/**
* Set up testing.
@@ -55,21 +59,37 @@ public class ApexKafkaConsumerTest {
consumerParameters
.setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+
+ apexKafkaConsumer2 = new ApexKafkaConsumer();
+ consumerParameters2 = new EventHandlerParameters();
+ kafkaParameters = new KafkaCarrierTechnologyParameters();
+ String[][] kafkaProperties = {
+ {"value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"},
+ {"schema.registry.url", "[http://test-registory:8080]"}
+ };
+ kafkaParameters.setKafkaProperties(kafkaProperties);
+
+ consumerParameters2
+ .setCarrierTechnologyParameters(kafkaParameters);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters2, incomingEventReceiver);
}
@Test
public void testStart() {
assertThatCode(apexKafkaConsumer::start).doesNotThrowAnyException();
+ assertThatCode(apexKafkaConsumer2::start).doesNotThrowAnyException();
}
@Test
public void testGetName() {
assertEquals("TestApexKafkaConsumer", apexKafkaConsumer.getName());
+ assertEquals("TestApexKafkaConsumer2", apexKafkaConsumer2.getName());
}
@Test
public void testGetPeeredReference() {
assertNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+ assertNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test
@@ -78,22 +98,30 @@ public class ApexKafkaConsumerTest {
apexKafkaConsumer, apexKafkaProducer);
apexKafkaConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference);
assertNotNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
+
+ PeeredReference peeredReference2 = new PeeredReference(EventHandlerPeeredMode.REQUESTOR,
+ apexKafkaConsumer2, apexKafkaProducer);
+ apexKafkaConsumer2.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference2);
+ assertNotNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test(expected = java.lang.NullPointerException.class)
public void testRun() {
apexKafkaConsumer.run();
+ apexKafkaConsumer2.run();
}
@Test(expected = java.lang.NullPointerException.class)
public void testStop() {
apexKafkaConsumer.stop();
+ apexKafkaConsumer2.stop();
}
@Test(expected = ApexEventException.class)
public void testInitWithNonKafkaCarrierTechnologyParameters() throws ApexEventException {
consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {});
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver);
}
}