From f31c7f8bd4985c84f9126d071439c1a4de57f704 Mon Sep 17 00:00:00 2001 From: JosephKeenan Date: Tue, 24 May 2022 18:59:25 +0100 Subject: Async request response NCMP -> Client -Added consumer for DMI events and producer for forwarding to client -Added schemas for events -Updated tests -Added new module for ncmp events -Used mapstruct for event mapping Issue-ID: CPS-830 Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605 Signed-off-by: sourabh_sourabh Signed-off-by: JosephKeenan Signed-off-by: ToineSiebelink Signed-off-by: JosephKeenan --- ...AsyncRequestResponseEventIntegrationSpec.groovy | 126 +++++++++++++++++++++ .../ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy | 6 +- 2 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy (limited to 'cps-ncmp-service/src/test/groovy/org') diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy new file mode 100644 index 000000000..aa6bf1a78 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.spock.Testcontainers +import org.testcontainers.utility.DockerImageName + +import java.time.Duration +import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.utils.JsonObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.cps.ncmp.utils.TestUtils; +import org.springframework.boot.test.context.SpringBootTest +import org.spockframework.spring.SpringBean +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import spock.lang.Specification + +@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer]) +@Testcontainers +@DirtiesContext +class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification { + + static kafkaTestContainer = new KafkaContainer( + DockerImageName.parse('confluentinc/cp-kafka:6.2.1') + ) + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + def producerConfigProperties = [ + (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ProducerConfig.RETRIES_CONFIG) : 0, + (ProducerConfig.BATCH_SIZE_CONFIG) : 16384, + (ProducerConfig.LINGER_MS_CONFIG) : 1, + (ProducerConfig.BUFFER_MEMORY_CONFIG) : 33554432, + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) : StringSerializer, + (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer + ] + + def consumerConfigProperties = [ + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) : StringDeserializer, + (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer, + (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) : 'earliest', + (ConsumerConfig.GROUP_ID_CONFIG) : 'test' + ] + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(producerConfigProperties)) + + @SpringBean + NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService = + new NcmpAsyncRequestResponseEventProducer(kafkaTemplate); + + @SpringBean + NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper = + Mappers.getMapper(NcmpAsyncRequestResponseEventMapper.class) + + @SpringBean + NcmpAsyncRequestResponseEventConsumer ncmpAsyncRequestResponseEventConsumer = + new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService, + ncmpAsyncRequestResponseEventMapper) + + def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + + def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties()) + + def 'Consume and forward valid message'() { + given: 'consumer has a subscription' + kafkaConsumer.subscribe(['test-topic'] as List) + and: 'an event is sent' + def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class) + when: 'the event is consumed' + ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent) + and: 'the topic is polled' + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + then: 'poll returns one record' + assert records.size() == 1 + and: 'consumed forwarded event id is the same as sent event id' + def record = records.iterator().next() + assert testEventSent.eventId.equalsIgnoreCase(jsonObjectMapper.convertJsonString(record.value(), + NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId()) + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy index 4c8dcace7..964826be1 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy @@ -45,14 +45,14 @@ class DmiServiceUrlBuilderSpec extends Specification { def uriVars = objectUnderTest.populateUriVariables(yangModelCmHandle, "cmHandle", PASSTHROUGH_RUNNING) and: 'query params' - def uriQueries = objectUnderTest.populateQueryParams(resourceId, - 'optionsParamInQuery', topicParamInQuery) + def uriQueries = objectUnderTest.populateQueryParams(resourceId, + 'optionsParamInQuery', topic) when: 'a dmi datastore service url is generated' def dmiServiceUrl = objectUnderTest.getDmiDatastoreUrl(uriQueries, uriVars) then: 'service url is generated as expected' assert dmiServiceUrl == expectedDmiServiceUrl where: 'the following parameters are used' - scenario | topicParamInQuery | resourceId || expectedDmiServiceUrl + scenario | topic | resourceId || expectedDmiServiceUrl 'With valid resourceId' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty resourceId' | 'topicParamInQuery' | '' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty dmi base path' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' -- cgit 1.2.3-korg