From 2cd8b98223bd49975fcca0ec7f1d4673a4163074 Mon Sep 17 00:00:00 2001 From: JosephKeenan Date: Mon, 23 May 2022 15:43:05 +0100 Subject: Async request response dmi -> NCMP -Added Async for passthrough running and operational -Build will fail until cps is merged https://gerrit.onap.org/r/c/cps/+/128685 Issue-ID: CPS-830 Change-Id: Iedbfab109f5cd777a5be8eed7414758d0f5ec05c Signed-off-by: JosephKeenan Signed-off-by: ToineSiebelink Signed-off-by: JosephKeenan --- .../async/AsyncTaskExecutorIntegrationSpec.groovy | 136 +++++++++++++++++++++ .../rest/controller/DmiRestControllerSpec.groovy | 28 ++++- .../cps/ncmp/dmi/service/DmiServiceImplSpec.groovy | 4 +- .../service/NcmpKafkaPublisherServiceSpec.groovy | 41 ------- .../ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy | 107 ---------------- 5 files changed, 163 insertions(+), 153 deletions(-) create mode 100644 src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy delete mode 100644 src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy delete mode 100644 src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy (limited to 'src/test/groovy/org/onap') diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy new file mode 100644 index 00000000..54c0fe09 --- /dev/null +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy @@ -0,0 +1,136 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the 'License'); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.notifications.async + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException +import org.onap.cps.ncmp.dmi.model.DataAccessRequest +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent +import org.spockframework.spring.SpringBean +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.HttpStatus +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.spock.Testcontainers +import org.testcontainers.utility.DockerImageName +import spock.lang.Specification + +import java.time.Duration + +@SpringBootTest(classes = [AsyncTaskExecutor, DmiAsyncRequestResponseEventProducer]) +@Testcontainers +@DirtiesContext +class AsyncTaskExecutorIntegrationSpec extends Specification { + + static kafkaTestContainer = new KafkaContainer( + DockerImageName.parse('confluentinc/cp-kafka:6.2.1') + ) + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + def producerConfigProperties = [ + 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0], + 'retries' : 0, + 'batch.size' : 16384, + 'linger.ms' : 1, + 'buffer.memory' : 33554432, + 'key.serializer' : StringSerializer, + 'value.serializer' : JsonSerializer + ] + + def consumerConfigProperties = [ + 'bootstrap.servers' : kafkaTestContainer.getBootstrapServers().split(',')[0], + 'key.deserializer' : StringDeserializer, + 'value.deserializer': StringDeserializer, + 'auto.offset.reset' : 'earliest', + 'group.id' : 'test' + ] + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory(producerConfigProperties)) + + @SpringBean + DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = + new DmiAsyncRequestResponseEventProducer(kafkaTemplate) + + KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigProperties) + + def spiedObjectMapper = Spy(ObjectMapper) + + def objectUnderTest = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer) + + private static final String TEST_TOPIC = 'test-topic' + + def setup() { + cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC + consumer.subscribe([TEST_TOPIC] as List) + } + + def cleanup() { + consumer.close() + } + + def 'Publish and Subscribe message - success'() { + when: 'a successful event is published' + objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200') + and: 'the topic is polled' + def records = consumer.poll(Duration.ofMillis(1500)) + then: 'the record received is the event sent' + def record = records.iterator().next() + DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) + and: 'the status & code matches expected' + assert event.getEventContent().getResponseStatus() == 'OK' + assert event.getEventContent().getResponseCode() == '200' + } + + def 'Publish and Subscribe message - failure'() { + when: 'a failure event is published' + def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR) + objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', DataAccessRequest.OperationEnum.READ, exception) + and: 'the topic is polled' + def records = consumer.poll(Duration.ofMillis(1500)) + then: 'the record received is the event sent' + def record = records.iterator().next() + DmiAsyncRequestResponseEvent event = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent) + and: 'the status & code matches expected' + assert event.getEventContent().getResponseStatus() == 'Internal Server Error' + assert event.getEventContent().getResponseCode() == '500' + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + +} \ No newline at end of file diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy index 1541f8ca..5bfbc400 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/rest/controller/DmiRestControllerSpec.groovy @@ -21,11 +21,14 @@ package org.onap.cps.ncmp.dmi.rest.controller + import org.onap.cps.ncmp.dmi.TestUtils import org.onap.cps.ncmp.dmi.exception.DmiException import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException -import org.onap.cps.ncmp.dmi.service.NcmpKafkaPublisherService +import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor +import org.onap.cps.ncmp.dmi.notifications.async.DmiAsyncRequestResponseEventProducer + import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.model.ModuleSet import org.onap.cps.ncmp.dmi.model.ModuleSetSchemas @@ -38,6 +41,7 @@ import org.springframework.beans.factory.annotation.Value import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest import org.springframework.http.HttpStatus import org.springframework.http.MediaType +import org.springframework.kafka.core.KafkaTemplate import org.springframework.security.test.context.support.WithMockUser import org.springframework.test.web.servlet.MockMvc import spock.lang.Specification @@ -53,7 +57,7 @@ import static org.onap.cps.ncmp.dmi.model.DataAccessRequest.OperationEnum.UPDATE import static org.springframework.http.HttpStatus.CREATED import static org.springframework.http.HttpStatus.OK -@WebMvcTest(DmiRestController) +@WebMvcTest(DmiRestController.class) @WithMockUser class DmiRestControllerSpec extends Specification { @@ -64,7 +68,10 @@ class DmiRestControllerSpec extends Specification { DmiService mockDmiService = Mock() @SpringBean - NcmpKafkaPublisherService mockNcmpKafkaPublisherService = Mock() + DmiAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducer = new DmiAsyncRequestResponseEventProducer(Mock(KafkaTemplate)) + + @SpringBean + AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor(cpsAsyncRequestResponseEventProducer) @Value('${rest.api.dmi-base-path}/v1') def basePathV1 @@ -256,6 +263,21 @@ class DmiRestControllerSpec extends Specification { response.getContentAsString() == '{some-json}' } + def 'PassThrough Returns OK when topic is used for async'(){ + given: 'an endpoint' + def readPassThroughUrl ="${basePathV1}/ch/some-cmHandle/data/ds/ncmp-datastore:" + + resourceIdentifier + + '?resourceIdentifier=some-resourceIdentifier&topic=test-topic' + when: 'endpoint is invoked' + def jsonData = TestUtils.getResourceFileContent('readData.json') + def response = mvc.perform( + post(readPassThroughUrl).contentType(MediaType.APPLICATION_JSON).content(jsonData) + ).andReturn().response + then: 'response status is OK' + assert response.status == HttpStatus.NO_CONTENT.value() + where: 'the following values are used' + resourceIdentifier << ['passthrough-operational', 'passthrough-running'] + } def 'Get resource data for pass-through running with #scenario value in resource identifier param.'() { given: 'Get resource data url' diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy index e38d5c37..1d87b775 100644 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy +++ b/src/test/groovy/org/onap/cps/ncmp/dmi/service/DmiServiceImplSpec.groovy @@ -29,7 +29,7 @@ import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException import org.onap.cps.ncmp.dmi.exception.DmiException import org.onap.cps.ncmp.dmi.exception.ModuleResourceNotFoundException import org.onap.cps.ncmp.dmi.exception.ModulesNotFoundException -import org.onap.cps.ncmp.dmi.exception.ResourceDataNotFound +import org.onap.cps.ncmp.dmi.exception.HttpClientRequestException import org.onap.cps.ncmp.dmi.service.model.ModuleReference import org.onap.cps.ncmp.dmi.model.YangResource import org.onap.cps.ncmp.dmi.model.YangResources @@ -221,7 +221,7 @@ class DmiServiceImplSpec extends Specification { objectUnderTest.getResourceData(cmHandle, resourceId, optionsParam, restConfQueryParam) then: 'resource data not found' - thrown(ResourceDataNotFound.class) + thrown(HttpClientRequestException.class) } def 'Get resource data for passthrough running.'() { diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy deleted file mode 100644 index f5bc4ac4..00000000 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherServiceSpec.groovy +++ /dev/null @@ -1,41 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service - -import spock.lang.Specification - -class NcmpKafkaPublisherServiceSpec extends Specification { - - def mockNcmpKafkaPublisher = Mock(NcmpKafkaPublisher) - def objectUnderTest = new NcmpKafkaPublisherService(mockNcmpKafkaPublisher) - - def 'Message publishing'() { - given: 'a sample message with key' - def message = 'sample message' - def messageKey = 'sample-key' - when: 'published' - objectUnderTest.publishToNcmp(messageKey, message) - then: 'no exception is thrown' - noExceptionThrown() - and: 'message is published once' - 1 * mockNcmpKafkaPublisher.sendMessage(messageKey, message) - } -} diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy deleted file mode 100644 index 00c8e6e7..00000000 --- a/src/test/groovy/org/onap/cps/ncmp/dmi/service/NcmpKafkaPublisherSpec.groovy +++ /dev/null @@ -1,107 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.dmi.service - -import org.apache.kafka.clients.admin.NewTopic -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value -import org.springframework.boot.test.context.SpringBootTest -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.serializer.JsonDeserializer -import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.spock.Testcontainers -import spock.lang.Specification - -import java.time.Duration - -import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - -@SpringBootTest -@Testcontainers -@DirtiesContext -class NcmpKafkaPublisherSpec extends Specification { - - static kafkaTestContainer = new KafkaContainer() - static { - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) - } - - def setupSpec() { - kafkaTestContainer.start() - } - - @Autowired - KafkaTemplate kafkaTemplate - - @Value('${app.ncmp.async-m2m.topic}') - String topic - - KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig()) - - def 'Publish and Subscribe message'() { - given: 'a sample messsage and key' - def message = 'sample message' - def messageKey = 'message-key' - def objectUnderTest = new NcmpKafkaPublisher(kafkaTemplate, topic) - when: 'a message is published' - objectUnderTest.sendMessage(messageKey, message) - then: 'a message is consumed' - consumer.subscribe([topic] as List) - def records = consumer.poll(Duration.ofMillis(1000)) - assert records.size() == 1 - assert messageKey == records[0].key - assert message == records[0].value - } - - def kafkaConsumerConfig() { - def configs = [:] - configs.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.name) - configs.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.name) - configs.put(AUTO_OFFSET_RESET_CONFIG, 'earliest') - configs.put(BOOTSTRAP_SERVERS_CONFIG, kafkaTestContainer.getBootstrapServers().split(",")[0]) - configs.put(GROUP_ID_CONFIG, 'test') - return configs - } - - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry registry) { - registry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) - } -} - -@Configuration -class TopicConfig { - @Bean - NewTopic newTopic() { - return new NewTopic("my-topic-name", 1, (short) 1); - } -} -- cgit 1.2.3-korg