diff options
Diffstat (limited to 'src')
12 files changed, 332 insertions, 105 deletions
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java new file mode 100644 index 00000000..cb617f9e --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java @@ -0,0 +1,127 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.config.kafka; + +import io.cloudevents.CloudEvent; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; + +/** + * kafka Configuration for legacy and cloud events. + * + * @param <T> valid legacy event to be published over the wire. + */ +@Configuration +@EnableKafka +@RequiredArgsConstructor +public class KafkaConfig<T> { + + private final KafkaProperties kafkaProperties; + + /** + * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into + * application.yml and replaces value-serializer by JsonSerializer. + * + * @return legacy event producer instance. + */ + @Bean + public ProducerFactory<String, T> legacyEventProducerFactory() { + final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(); + producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined + * into application.yml and replaces deserializer-value by JsonDeserializer. + * + * @return an instance of legacy consumer factory. + */ + @Bean + public ConsumerFactory<String, T> legacyEventConsumerFactory() { + final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into + * application.yml with CloudEventSerializer. + * + * @return cloud event producer instance. + */ + @Bean + public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() { + final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(); + return new DefaultKafkaProducerFactory<>(producerConfigProperties); + } + + /** + * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined + * into application.yml having CloudEventDeserializer as deserializer-value. + * + * @return an instance of cloud consumer factory. + */ + @Bean + public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() { + final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(); + return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); + } + + /** + * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this. + * + * @return an instance of legacy Kafka template. + */ + @Bean + @Primary + public KafkaTemplate<String, T> legacyEventKafkaTemplate() { + final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); + return kafkaTemplate; + } + + /** + * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this. + * + * @return an instance of cloud Kafka template. + */ + @Bean + public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() { + final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); + kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); + return kafkaTemplate; + } + +} diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java new file mode 100644 index 00000000..b8bd277d --- /dev/null +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java @@ -0,0 +1,104 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.avc; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import java.net.URI; +import java.time.format.DateTimeFormatter; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; +import org.onap.cps.ncmp.events.avc1_0_0.Data; +import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges; +import org.onap.cps.ncmp.events.avc1_0_0.Edit; +import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch; +import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate; +import org.onap.cps.ncmp.events.avc1_0_0.Value; + +/** + * Helper to create AvcEvents. + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DmiDataAvcCloudEventCreator { + + private static final DateTimeFormatter dateTimeFormatter = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Creates CloudEvent for DMI Data AVC. + * + * @param eventCorrelationId correlationid + * @return Cloud Event + */ + public static CloudEvent createCloudEvent(final String eventCorrelationId) { + + CloudEvent cloudEvent = null; + + try { + cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("NCMP")) + .withType(AvcEvent.class.getName()) + .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0")) + .withExtension("correlationid", eventCorrelationId) + .withData(objectMapper.writeValueAsBytes(createDmiDataAvcEvent())).build(); + } catch (final JsonProcessingException jsonProcessingException) { + log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage()); + } + + return cloudEvent; + } + + private static AvcEvent createDmiDataAvcEvent() { + final AvcEvent avcEvent = new AvcEvent(); + final Data data = new Data(); + final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate(); + final DatastoreChanges datastoreChanges = new DatastoreChanges(); + final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch(); + ietfYangPatchYangPatch.setPatchId("abcd"); + final Edit edit1 = new Edit(); + final Value value = new Value(); + final Map<String, Object> attributeMap = new LinkedHashMap<>(); + attributeMap.put("isHoAllowed", false); + value.setAttributes(List.of(attributeMap)); + edit1.setEditId("editId"); + edit1.setOperation("replace"); + edit1.setTarget("target_xpath"); + edit1.setValue(value); + ietfYangPatchYangPatch.setEdit(List.of(edit1)); + datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch); + pushChangeUpdate.setDatastoreChanges(datastoreChanges); + data.setPushChangeUpdate(pushChangeUpdate); + + avcEvent.setData(data); + return avcEvent; + } + +}
\ No newline at end of file diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java deleted file mode 100644 index 03ed1c4c..00000000 --- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.notifications.avc; - -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.UUID; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.event.model.AvcEvent; - -/** - * Helper to create AvcEvents. - */ -@Slf4j -public class DmiDataAvcEventCreator { - - private static final DateTimeFormatter dateTimeFormatter - = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ"); - - /** - * Create an AVC event. - * - * @param eventCorrelationId the event correlation id - * @return DmiAsyncRequestResponseEvent - */ - public AvcEvent createEvent(final String eventCorrelationId) { - final AvcEvent avcEvent = new AvcEvent(); - avcEvent.setEventId(UUID.randomUUID().toString()); - avcEvent.setEventCorrelationId(eventCorrelationId); - avcEvent.setEventType(AvcEvent.class.getName()); - avcEvent.setEventSchema("urn:cps:" + AvcEvent.class.getName()); - avcEvent.setEventSchemaVersion("v1"); - avcEvent.setEventSource("NCMP"); - avcEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter)); - - final Map<String, Object> eventPayload = new LinkedHashMap<>(); - eventPayload.put("push-change-update", "{}"); - avcEvent.setEvent(eventPayload); - - log.debug("Avc Event Created ID: {}", avcEvent.getEventId()); - return avcEvent; - } - -}
\ No newline at end of file diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java index 4fd46b66..075dcf20 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java @@ -20,9 +20,11 @@ package org.onap.cps.ncmp.dmi.notifications.avc; + +import io.cloudevents.CloudEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.event.model.AvcEvent; +import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @@ -31,16 +33,18 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class DmiDataAvcEventProducer { - private final KafkaTemplate<String, AvcEvent> kafkaTemplate; - + private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate; + /** - * Sends message to the configured topic with a message key. + * Publishing DMI Data AVC event payload as CloudEvent. * - * @param requestId the request id - * @param avcEvent the event to publish + * @param requestId the request id + * @param cloudAvcEvent event with data as DMI DataAVC event */ - public void sendMessage(final String requestId, final AvcEvent avcEvent) { - kafkaTemplate.send("dmi-cm-events", requestId, avcEvent); + public void publishDmiDataAvcCloudEvent(final String requestId, final CloudEvent cloudAvcEvent) { + final ProducerRecord<String, CloudEvent> producerRecord = + new ProducerRecord<>("dmi-cm-events", requestId, cloudAvcEvent); + cloudEventKafkaTemplate.send(producerRecord); log.debug("AVC event sent"); } } diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java index f7f4bf96..c5fb8fbe 100644 --- a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java +++ b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java @@ -20,10 +20,10 @@ package org.onap.cps.ncmp.dmi.notifications.avc; +import io.cloudevents.CloudEvent; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.event.model.AvcEvent; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -42,18 +42,18 @@ public class DmiDataAvcEventSimulationController { /** * Simulate Event for AVC. + * * @param numberOfSimulatedEvents number of events to be generated * @return ResponseEntity */ @GetMapping(path = "/v1/simulateDmiDataEvent") - public ResponseEntity<Void> simulateEvents(@RequestParam("numberOfSimulatedEvents") - final Integer numberOfSimulatedEvents) { - final DmiDataAvcEventCreator dmiDataAvcEventCreator = new DmiDataAvcEventCreator(); + public ResponseEntity<Void> simulateEvents( + @RequestParam("numberOfSimulatedEvents") final Integer numberOfSimulatedEvents) { for (int i = 0; i < numberOfSimulatedEvents; i++) { final String eventCorrelationId = UUID.randomUUID().toString(); - final AvcEvent avcEvent = dmiDataAvcEventCreator.createEvent(eventCorrelationId); - dmiDataAvcEventProducer.sendMessage(eventCorrelationId, avcEvent); + final CloudEvent cloudEvent = DmiDataAvcCloudEventCreator.createCloudEvent(eventCorrelationId); + dmiDataAvcEventProducer.publishDmiDataAvcCloudEvent(eventCorrelationId, cloudEvent); } return new ResponseEntity<>(HttpStatus.OK); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 04001435..d964748f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -51,7 +51,7 @@ spring: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + value-serializer: io.cloudevents.kafka.CloudEventSerializer client-id: ncmp-dmi-plugin consumer: group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group} @@ -59,9 +59,13 @@ spring: value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer - spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer spring.json.use.type.headers: false + jackson: + serialization: + FAIL_ON_EMPTY_BEANS: false + app: ncmp: async: diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy index e5f00f33..13dd043d 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy @@ -20,10 +20,12 @@ package org.onap.cps.ncmp.dmi.api.kafka +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer -import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.serializer.JsonSerializer @@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification { static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka')) - def producerConfigProperties() { + def producerConfigProperties(valueSerializer) { return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0], ('retries') : 0, ('batch-size') : 16384, ('linger.ms') : 1, ('buffer.memory') : 33554432, ('key.serializer') : StringSerializer, - ('value.serializer') : JsonSerializer] + ('value.serializer') : valueSerializer] } - def consumerConfigProperties(consumerGroupId) { + def consumerConfigProperties(consumerGroupId, valueDeserializer) { return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0], ('key.deserializer') : StringDeserializer, - ('value.deserializer'): StringDeserializer, + ('value.deserializer'): valueDeserializer, ('auto.offset.reset') : 'earliest', ('group.id') : consumerGroupId ] } - def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties())) + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer))) + def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer)) - def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) + def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer))) + def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer)) @DynamicPropertySource static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy new file mode 100644 index 00000000..f09434be --- /dev/null +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy @@ -0,0 +1,42 @@ +package org.onap.cps.ncmp.dmi.config.kafka + +import io.cloudevents.CloudEvent +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.CloudEventSerializer +import org.spockframework.spring.EnableSharedInjection +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.kafka.KafkaProperties +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonDeserializer +import org.springframework.kafka.support.serializer.JsonSerializer +import spock.lang.Shared +import spock.lang.Specification + +@SpringBootTest(classes = [KafkaProperties, KafkaConfig]) +@EnableSharedInjection +@EnableConfigurationProperties +class KafkaConfigSpec extends Specification { + + @Shared + @Autowired + KafkaTemplate<String, String> legacyEventKafkaTemplate + + @Shared + @Autowired + KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate + + def 'Verify kafka template serializer and deserializer configuration for #eventType.'() { + expect: 'kafka template is instantiated' + assert kafkaTemplateInstance.properties['beanName'] == beanName + and: 'verify event key and value serializer' + assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName()) + and: 'verify event key and value deserializer' + assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName()) + where: 'the following event type is used' + eventType | kafkaTemplateInstance || beanName | valueSerializer | delegateDeserializer + 'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer | JsonDeserializer + 'cloud event' | cloudEventKafkaTemplate || 'cloudEventKafkaTemplate' | CloudEventSerializer | CloudEventDeserializer + } +} diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy index 96e2c16d..7ca2d54c 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy @@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec { def setup() { cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC - consumer.subscribe([TEST_TOPIC] as List<String>) + kafkaConsumer.subscribe([TEST_TOPIC] as List<String>) } def cleanup() { - consumer.close() + kafkaConsumer.close() } def 'Publish and Subscribe message - success'() { when: 'a successful event is published' objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200') and: 'the topic is polled' - def records = consumer.poll(Duration.ofMillis(1500)) + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'the record received is the event sent' def record = records.iterator().next() DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) @@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec { def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR) objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception) and: 'the topic is polled' - def records = consumer.poll(Duration.ofMillis(1500)) + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) then: 'the record received is the event sent' def record = records.iterator().next() DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy index 5f7ed878..a7557bb9 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy @@ -21,12 +21,10 @@ package org.onap.cps.ncmp.dmi.notifications.avc import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.kafka.clients.consumer.KafkaConsumer +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.jackson.PojoCloudEventDataMapper import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor -import org.onap.cps.ncmp.dmi.service.DmiService -import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController -import org.onap.cps.ncmp.event.model.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.spockframework.spring.SpringBean import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext @@ -40,9 +38,9 @@ import java.time.Duration class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec { @SpringBean - DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate) + DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate) - def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer) + def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer) def objectMapper = new ObjectMapper() @@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec { given: 'a simulated event' dmiService.simulateEvents(1) and: 'a consumer subscribed to dmi-cm-events topic' - def consumer = new KafkaConsumer<>(consumerConfigProperties('test')) - consumer.subscribe(['dmi-cm-events']) + cloudEventKafkaConsumer.subscribe(['dmi-cm-events']) when: 'the next event record is consumed' - def record = consumer.poll(Duration.ofMillis(1500)).iterator().next() + def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next() then: 'record has correct topic' assert record.topic == 'dmi-cm-events' and: 'the record value can be mapped to an avcEvent' - objectMapper.readValue(record.value(), AvcEvent) + def dmiDataAvcEvent = record.value() + def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() + assert convertedAvcEvent != null } }
\ No newline at end of file diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy index 65567ef8..59873ecf 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy @@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer import org.onap.cps.ncmp.dmi.TestUtils import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse @@ -39,8 +40,6 @@ import java.time.Duration @DirtiesContext class SubscriptionEventConsumerSpec extends MessagingBaseSpec { - def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) - def objectMapper = new ObjectMapper() def testTopic = 'dmi-ncmp-cm-avc-subscription' diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index d2aafbd7..43eb0fc5 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -59,8 +59,16 @@ spring: protocol: PLAINTEXT producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + value-serializer: io.cloudevents.kafka.CloudEventSerializer client-id: ncmp-dmi-plugin + consumer: + group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group} + key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer + properties: + spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer + spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer + spring.json.use.type.headers: false app: ncmp: |