summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml29
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java127
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java104
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java64
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java20
-rw-r--r--src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java12
-rw-r--r--src/main/resources/application.yml8
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy18
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy42
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy8
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy21
-rw-r--r--src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy3
-rw-r--r--src/test/resources/application.yml10
13 files changed, 360 insertions, 106 deletions
diff --git a/pom.xml b/pom.xml
index e35c840f..e08f725f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
<properties>
<app>org.onap.cps.ncmp.dmi.Application</app>
<base.image>${docker.pull.registry}/onap/integration-java11:8.0.0</base.image>
- <cps.version>3.2.6</cps.version>
+ <cps.version>3.3.3</cps.version>
<image.tag>${project.version}-${maven.build.timestamp}</image.tag>
<jacoco.minimum.coverage>0.98</jacoco.minimum.coverage>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format>
@@ -59,6 +59,21 @@
<version>2.2.10</version>
</dependency>
<dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-json-jackson</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-kafka</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-spring</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.0.1</version>
@@ -131,6 +146,18 @@
<artifactId>swagger-annotations</artifactId>
</dependency>
<dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-json-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.cloudevents</groupId>
+ <artifactId>cloudevents-spring</artifactId>
+ </dependency>
+ <dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
</dependency>
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
new file mode 100644
index 00000000..cb617f9e
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.config.kafka;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+/**
+ * kafka Configuration for legacy and cloud events.
+ *
+ * @param <T> valid legacy event to be published over the wire.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class KafkaConfig<T> {
+
+ private final KafkaProperties kafkaProperties;
+
+ /**
+ * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
+ * application.yml and replaces value-serializer by JsonSerializer.
+ *
+ * @return legacy event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, T> legacyEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
+ * into application.yml and replaces deserializer-value by JsonDeserializer.
+ *
+ * @return an instance of legacy consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+ * application.yml with CloudEventSerializer.
+ *
+ * @return cloud event producer instance.
+ */
+ @Bean
+ public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
+ final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+ return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+ }
+
+ /**
+ * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+ * into application.yml having CloudEventDeserializer as deserializer-value.
+ *
+ * @return an instance of cloud consumer factory.
+ */
+ @Bean
+ public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
+ final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+ return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+ }
+
+ /**
+ * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+ *
+ * @return an instance of legacy Kafka template.
+ */
+ @Bean
+ @Primary
+ public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+ /**
+ * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+ *
+ * @return an instance of cloud Kafka template.
+ */
+ @Bean
+ public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+ kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
+ return kafkaTemplate;
+ }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java
new file mode 100644
index 00000000..b8bd277d
--- /dev/null
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.time.format.DateTimeFormatter;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Data;
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch;
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate;
+import org.onap.cps.ncmp.events.avc1_0_0.Value;
+
+/**
+ * Helper to create AvcEvents.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DmiDataAvcCloudEventCreator {
+
+ private static final DateTimeFormatter dateTimeFormatter =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Creates CloudEvent for DMI Data AVC.
+ *
+ * @param eventCorrelationId correlationid
+ * @return Cloud Event
+ */
+ public static CloudEvent createCloudEvent(final String eventCorrelationId) {
+
+ CloudEvent cloudEvent = null;
+
+ try {
+ cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("NCMP"))
+ .withType(AvcEvent.class.getName())
+ .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0"))
+ .withExtension("correlationid", eventCorrelationId)
+ .withData(objectMapper.writeValueAsBytes(createDmiDataAvcEvent())).build();
+ } catch (final JsonProcessingException jsonProcessingException) {
+ log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage());
+ }
+
+ return cloudEvent;
+ }
+
+ private static AvcEvent createDmiDataAvcEvent() {
+ final AvcEvent avcEvent = new AvcEvent();
+ final Data data = new Data();
+ final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate();
+ final DatastoreChanges datastoreChanges = new DatastoreChanges();
+ final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch();
+ ietfYangPatchYangPatch.setPatchId("abcd");
+ final Edit edit1 = new Edit();
+ final Value value = new Value();
+ final Map<String, Object> attributeMap = new LinkedHashMap<>();
+ attributeMap.put("isHoAllowed", false);
+ value.setAttributes(List.of(attributeMap));
+ edit1.setEditId("editId");
+ edit1.setOperation("replace");
+ edit1.setTarget("target_xpath");
+ edit1.setValue(value);
+ ietfYangPatchYangPatch.setEdit(List.of(edit1));
+ datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch);
+ pushChangeUpdate.setDatastoreChanges(datastoreChanges);
+ data.setPushChangeUpdate(pushChangeUpdate);
+
+ avcEvent.setData(data);
+ return avcEvent;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java
deleted file mode 100644
index 03ed1c4c..00000000
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.notifications.avc;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
-
-/**
- * Helper to create AvcEvents.
- */
-@Slf4j
-public class DmiDataAvcEventCreator {
-
- private static final DateTimeFormatter dateTimeFormatter
- = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-
- /**
- * Create an AVC event.
- *
- * @param eventCorrelationId the event correlation id
- * @return DmiAsyncRequestResponseEvent
- */
- public AvcEvent createEvent(final String eventCorrelationId) {
- final AvcEvent avcEvent = new AvcEvent();
- avcEvent.setEventId(UUID.randomUUID().toString());
- avcEvent.setEventCorrelationId(eventCorrelationId);
- avcEvent.setEventType(AvcEvent.class.getName());
- avcEvent.setEventSchema("urn:cps:" + AvcEvent.class.getName());
- avcEvent.setEventSchemaVersion("v1");
- avcEvent.setEventSource("NCMP");
- avcEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
-
- final Map<String, Object> eventPayload = new LinkedHashMap<>();
- eventPayload.put("push-change-update", "{}");
- avcEvent.setEvent(eventPayload);
-
- log.debug("Avc Event Created ID: {}", avcEvent.getEventId());
- return avcEvent;
- }
-
-} \ No newline at end of file
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
index 4fd46b66..075dcf20 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
@@ -20,9 +20,11 @@
package org.onap.cps.ncmp.dmi.notifications.avc;
+
+import io.cloudevents.CloudEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@@ -31,16 +33,18 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class DmiDataAvcEventProducer {
- private final KafkaTemplate<String, AvcEvent> kafkaTemplate;
-
+ private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
/**
- * Sends message to the configured topic with a message key.
+ * Publishing DMI Data AVC event payload as CloudEvent.
*
- * @param requestId the request id
- * @param avcEvent the event to publish
+ * @param requestId the request id
+ * @param cloudAvcEvent event with data as DMI DataAVC event
*/
- public void sendMessage(final String requestId, final AvcEvent avcEvent) {
- kafkaTemplate.send("dmi-cm-events", requestId, avcEvent);
+ public void publishDmiDataAvcCloudEvent(final String requestId, final CloudEvent cloudAvcEvent) {
+ final ProducerRecord<String, CloudEvent> producerRecord =
+ new ProducerRecord<>("dmi-cm-events", requestId, cloudAvcEvent);
+ cloudEventKafkaTemplate.send(producerRecord);
log.debug("AVC event sent");
}
}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
index f7f4bf96..c5fb8fbe 100644
--- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
+++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
@@ -20,10 +20,10 @@
package org.onap.cps.ncmp.dmi.notifications.avc;
+import io.cloudevents.CloudEvent;
import java.util.UUID;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@@ -42,18 +42,18 @@ public class DmiDataAvcEventSimulationController {
/**
* Simulate Event for AVC.
+ *
* @param numberOfSimulatedEvents number of events to be generated
* @return ResponseEntity
*/
@GetMapping(path = "/v1/simulateDmiDataEvent")
- public ResponseEntity<Void> simulateEvents(@RequestParam("numberOfSimulatedEvents")
- final Integer numberOfSimulatedEvents) {
- final DmiDataAvcEventCreator dmiDataAvcEventCreator = new DmiDataAvcEventCreator();
+ public ResponseEntity<Void> simulateEvents(
+ @RequestParam("numberOfSimulatedEvents") final Integer numberOfSimulatedEvents) {
for (int i = 0; i < numberOfSimulatedEvents; i++) {
final String eventCorrelationId = UUID.randomUUID().toString();
- final AvcEvent avcEvent = dmiDataAvcEventCreator.createEvent(eventCorrelationId);
- dmiDataAvcEventProducer.sendMessage(eventCorrelationId, avcEvent);
+ final CloudEvent cloudEvent = DmiDataAvcCloudEventCreator.createCloudEvent(eventCorrelationId);
+ dmiDataAvcEventProducer.publishDmiDataAvcCloudEvent(eventCorrelationId, cloudEvent);
}
return new ResponseEntity<>(HttpStatus.OK);
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 04001435..d964748f 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -51,7 +51,7 @@ spring:
protocol: PLAINTEXT
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ value-serializer: io.cloudevents.kafka.CloudEventSerializer
client-id: ncmp-dmi-plugin
consumer:
group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
@@ -59,9 +59,13 @@ spring:
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
- spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+ spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
spring.json.use.type.headers: false
+ jackson:
+ serialization:
+ FAIL_ON_EMPTY_BEANS: false
+
app:
ncmp:
async:
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
index e5f00f33..13dd043d 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
@@ -20,10 +20,12 @@
package org.onap.cps.ncmp.dmi.api.kafka
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.serializer.JsonSerializer
@@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification {
static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
- def producerConfigProperties() {
+ def producerConfigProperties(valueSerializer) {
return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
('retries') : 0,
('batch-size') : 16384,
('linger.ms') : 1,
('buffer.memory') : 33554432,
('key.serializer') : StringSerializer,
- ('value.serializer') : JsonSerializer]
+ ('value.serializer') : valueSerializer]
}
- def consumerConfigProperties(consumerGroupId) {
+ def consumerConfigProperties(consumerGroupId, valueDeserializer) {
return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
('key.deserializer') : StringDeserializer,
- ('value.deserializer'): StringDeserializer,
+ ('value.deserializer'): valueDeserializer,
('auto.offset.reset') : 'earliest',
('group.id') : consumerGroupId
]
}
- def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
+ def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer)))
+ def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer))
- def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+ def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer)))
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer))
@DynamicPropertySource
static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
new file mode 100644
index 00000000..f09434be
--- /dev/null
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
@@ -0,0 +1,42 @@
+package org.onap.cps.ncmp.dmi.config.kafka
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaConfigSpec extends Specification {
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+ @Shared
+ @Autowired
+ KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+ def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+ expect: 'kafka template is instantiated'
+ assert kafkaTemplateInstance.properties['beanName'] == beanName
+ and: 'verify event key and value serializer'
+ assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+ and: 'verify event key and value deserializer'
+ assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+ where: 'the following event type is used'
+ eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer
+ 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer
+ 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer
+ }
+}
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
index 96e2c16d..7ca2d54c 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
@@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def setup() {
cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
- consumer.subscribe([TEST_TOPIC] as List<String>)
+ kafkaConsumer.subscribe([TEST_TOPIC] as List<String>)
}
def cleanup() {
- consumer.close()
+ kafkaConsumer.close()
}
def 'Publish and Subscribe message - success'() {
when: 'a successful event is published'
objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
@@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception)
and: 'the topic is polled'
- def records = consumer.poll(Duration.ofMillis(1500))
+ def records = kafkaConsumer.poll(Duration.ofMillis(1500))
then: 'the record received is the event sent'
def record = records.iterator().next()
DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
index 5f7ed878..a7557bb9 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
@@ -21,12 +21,10 @@
package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
-import org.onap.cps.ncmp.dmi.service.DmiService
-import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController
-import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.spockframework.spring.SpringBean
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
@@ -40,9 +38,9 @@ import java.time.Duration
class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
@SpringBean
- DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate)
+ DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate)
- def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
+ def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
def objectMapper = new ObjectMapper()
@@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
given: 'a simulated event'
dmiService.simulateEvents(1)
and: 'a consumer subscribed to dmi-cm-events topic'
- def consumer = new KafkaConsumer<>(consumerConfigProperties('test'))
- consumer.subscribe(['dmi-cm-events'])
+ cloudEventKafkaConsumer.subscribe(['dmi-cm-events'])
when: 'the next event record is consumed'
- def record = consumer.poll(Duration.ofMillis(1500)).iterator().next()
+ def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next()
then: 'record has correct topic'
assert record.topic == 'dmi-cm-events'
and: 'the record value can be mapped to an avcEvent'
- objectMapper.readValue(record.value(), AvcEvent)
+ def dmiDataAvcEvent = record.value()
+ def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+ assert convertedAvcEvent != null
}
} \ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
index 65567ef8..59873ecf 100644
--- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
+++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.cps.ncmp.dmi.TestUtils
import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse
@@ -39,8 +40,6 @@ import java.time.Duration
@DirtiesContext
class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
- def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
-
def objectMapper = new ObjectMapper()
def testTopic = 'dmi-ncmp-cm-avc-subscription'
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
index d2aafbd7..43eb0fc5 100644
--- a/src/test/resources/application.yml
+++ b/src/test/resources/application.yml
@@ -59,8 +59,16 @@ spring:
protocol: PLAINTEXT
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ value-serializer: io.cloudevents.kafka.CloudEventSerializer
client-id: ncmp-dmi-plugin
+ consumer:
+ group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
+ key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+ properties:
+ spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+ spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
+ spring.json.use.type.headers: false
app:
ncmp: