diff options
author | JosephKeenan <joseph.keenan@est.tech> | 2022-05-24 18:59:25 +0100 |
---|---|---|
committer | JosephKeenan <joseph.keenan@est.tech> | 2022-05-25 10:47:34 +0100 |
commit | f31c7f8bd4985c84f9126d071439c1a4de57f704 (patch) | |
tree | 3b5d91b6357705304ae95fe1ad01156afbded020 /cps-ncmp-service/src | |
parent | 4cf4962b74765a5afe234aa258a9143ea6936f73 (diff) |
Async request response NCMP -> Client
-Added consumer for DMI events and producer for forwarding to client
-Added schemas for events
-Updated tests
-Added new module for ncmp events
-Used mapstruct for event mapping
Issue-ID: CPS-830
Change-Id: I096d08af9d69092cf8651e11eaa00ce441fc3605
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Diffstat (limited to 'cps-ncmp-service/src')
6 files changed, 331 insertions, 3 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java new file mode 100644 index 0000000000..4e5c57ba57 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * Listener for cps-ncmp async request response events. + */ +@Component +@Slf4j +@RequiredArgsConstructor +public class NcmpAsyncRequestResponseEventConsumer { + + private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer; + private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper; + + /** + * Consume the specified event. + * + * @param dmiAsyncRequestResponseEvent the event to be consumed and produced. + */ + @KafkaListener(topics = "${app.ncmp.async-m2m.topic}") + public void consumeAndForward(final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) { + log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); + + final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent = + ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent); + ncmpAsyncRequestResponseEventProducer.sendMessage( + ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java new file mode 100644 index 0000000000..5d8ac7f841 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventMapper.java @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.UUID; +import org.mapstruct.AfterMapping; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.MappingTarget; +import org.mapstruct.Named; +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; + +/** + * Mapper for converting DmiAsyncRequestResponseEvent to NcmpAsyncRequestResponseEvent. + */ +@Mapper(componentModel = "spring") +public interface NcmpAsyncRequestResponseEventMapper { + + @Mapping(source = "eventId", target = "eventId", qualifiedByName = "ncmpAsyncEventId") + @Mapping(source = "eventTime", target = "eventTime", qualifiedByName = "currentTime") + @Mapping(source = "eventId", target = "forwardedEvent.eventId") + @Mapping(source = "eventCorrelationId", target = "forwardedEvent.eventCorrelationId") + @Mapping(source = "eventSchema", target = "forwardedEvent.eventSchema") + @Mapping(source = "eventSource", target = "forwardedEvent.eventSource") + @Mapping(source = "eventTarget", target = "forwardedEvent.eventTarget") + @Mapping(source = "eventTime", target = "forwardedEvent.eventTime") + @Mapping(source = "eventType", target = "forwardedEvent.eventType") + @Mapping(source = "eventContent.responseStatus", target = "forwardedEvent.responseStatus") + @Mapping(source = "eventContent.responseCode", target = "forwardedEvent.responseCode") + @Mapping(source = "eventContent.responseDataSchema", target = "forwardedEvent.responseDataSchema") + NcmpAsyncRequestResponseEvent toNcmpAsyncEvent(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent); + + @Named("ncmpAsyncEventId") + static String getNcmpAsyncEventId(String eventId) { + return UUID.randomUUID().toString(); + } + + @Named("currentTime") + static String getFormattedCurrentTime(String eventTime) { + return ZonedDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + } + + @AfterMapping + default void mapAdditionalProperties(DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent, + @MappingTarget NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) { + ncmpAsyncRequestResponseEvent.getForwardedEvent().setAdditionalProperty("response-data", + dmiAsyncRequestResponseEvent.getEventContent().getResponseData().getAdditionalProperties()); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java new file mode 100644 index 0000000000..8ab6db9045 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventProducer.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class NcmpAsyncRequestResponseEventProducer { + + private final KafkaTemplate<String, NcmpAsyncRequestResponseEvent> kafkaTemplate; + + + /** + * Sends message to the configured topic with a message key. + * + * @param eventId message key + * @param ncmpAsyncRequestResponseEvent message payload + */ + public void sendMessage(final String eventId, final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent) { + kafkaTemplate.send(ncmpAsyncRequestResponseEvent.getEventTarget(), eventId, ncmpAsyncRequestResponseEvent); + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy new file mode 100644 index 0000000000..aa6bf1a783 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -0,0 +1,126 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.async + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringSerializer +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent +import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.spock.Testcontainers +import org.testcontainers.utility.DockerImageName + +import java.time.Duration +import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.utils.JsonObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.serialization.StringDeserializer +import org.onap.cps.ncmp.utils.TestUtils; +import org.springframework.boot.test.context.SpringBootTest +import org.spockframework.spring.SpringBean +import org.springframework.test.annotation.DirtiesContext +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import spock.lang.Specification + +@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer]) +@Testcontainers +@DirtiesContext +class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification { + + static kafkaTestContainer = new KafkaContainer( + DockerImageName.parse('confluentinc/cp-kafka:6.2.1') + ) + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + def producerConfigProperties = [ + (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ProducerConfig.RETRIES_CONFIG) : 0, + (ProducerConfig.BATCH_SIZE_CONFIG) : 16384, + (ProducerConfig.LINGER_MS_CONFIG) : 1, + (ProducerConfig.BUFFER_MEMORY_CONFIG) : 33554432, + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) : StringSerializer, + (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer + ] + + def consumerConfigProperties = [ + (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], + (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) : StringDeserializer, + (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer, + (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) : 'earliest', + (ConsumerConfig.GROUP_ID_CONFIG) : 'test' + ] + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties)) + + @SpringBean + NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService = + new NcmpAsyncRequestResponseEventProducer(kafkaTemplate); + + @SpringBean + NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper = + Mappers.getMapper(NcmpAsyncRequestResponseEventMapper.class) + + @SpringBean + NcmpAsyncRequestResponseEventConsumer ncmpAsyncRequestResponseEventConsumer = + new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService, + ncmpAsyncRequestResponseEventMapper) + + def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + + def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties()) + + def 'Consume and forward valid message'() { + given: 'consumer has a subscription' + kafkaConsumer.subscribe(['test-topic'] as List<String>) + and: 'an event is sent' + def jsonData = TestUtils.getResourceFileContent('dmiAsyncRequestResponseEvent.json') + def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DmiAsyncRequestResponseEvent.class) + when: 'the event is consumed' + ncmpAsyncRequestResponseEventConsumer.consumeAndForward(testEventSent) + and: 'the topic is polled' + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + then: 'poll returns one record' + assert records.size() == 1 + and: 'consumed forwarded event id is the same as sent event id' + def record = records.iterator().next() + assert testEventSent.eventId.equalsIgnoreCase(jsonObjectMapper.convertJsonString(record.value(), + NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId()) + } + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy index 4c8dcace7d..964826be13 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/DmiServiceUrlBuilderSpec.groovy @@ -45,14 +45,14 @@ class DmiServiceUrlBuilderSpec extends Specification { def uriVars = objectUnderTest.populateUriVariables(yangModelCmHandle, "cmHandle", PASSTHROUGH_RUNNING) and: 'query params' - def uriQueries = objectUnderTest.populateQueryParams(resourceId, - 'optionsParamInQuery', topicParamInQuery) + def uriQueries = objectUnderTest.populateQueryParams(resourceId, + 'optionsParamInQuery', topic) when: 'a dmi datastore service url is generated' def dmiServiceUrl = objectUnderTest.getDmiDatastoreUrl(uriQueries, uriVars) then: 'service url is generated as expected' assert dmiServiceUrl == expectedDmiServiceUrl where: 'the following parameters are used' - scenario | topicParamInQuery | resourceId || expectedDmiServiceUrl + scenario | topic | resourceId || expectedDmiServiceUrl 'With valid resourceId' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty resourceId' | 'topicParamInQuery' | '' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?options=optionsParamInQuery&topic=topicParamInQuery' 'With Empty dmi base path' | 'topicParamInQuery' | 'resourceId' || 'dmiServiceName/dmi/v1/ch/cmHandle/data/ds/ncmp-datastore:passthrough-running?resourceIdentifier=resourceId&options=optionsParamInQuery&topic=topicParamInQuery' diff --git a/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json new file mode 100644 index 0000000000..bf6c86aaac --- /dev/null +++ b/cps-ncmp-service/src/test/resources/dmiAsyncRequestResponseEvent.json @@ -0,0 +1,30 @@ +{ + "eventId": "8dbfe0a7-3b28-4109-8fcb-9fbc9c37d56a", + "eventCorrelationId": "122ca20b-4f8c-4759-a2b4-f0b9456df204", + "eventTime": "2022-05-09T13:34:50.466+0000", + "eventSource": "org.onap.ncmp", + "eventSchema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1", + "eventTarget": "test-topic", + "eventContent": { + "response-data-schema": "urn:cps:org.onap.cps:async-request-response-event-schema:v1", + "response-status": "SUCCESS", + "response-code": "200", + "response-data": { + "ietf-netconf-monitoring:netconf-state": { + "schemas": { + "schema": [ + { + "identifier": "ietf-tls-server", + "version": "2016-11-02", + "format": "ietf-netconf-monitoring:yang", + "namespace": "urn:ietf:params:xml:ns:yang:ietf-tls-server", + "location": [ + "NETCONF" + ] + } + ] + } + } + } + } +} |