diff options
Diffstat (limited to 'cps-ncmp-service/src')
5 files changed, 257 insertions, 60 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java new file mode 100644 index 0000000000..52ac4685e2 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.ncmp.cmhandle.lcm.event.NcmpEvent; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * NcmpEventsPublisher to publish the NcmpEvents on event of CREATE, UPDATE and DELETE. + */ + +@Slf4j +@Service +@RequiredArgsConstructor +public class NcmpEventsPublisher { + + private final KafkaTemplate<String, NcmpEvent> ncmpEventKafkaTemplate; + + /** + * NCMP Event publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param ncmpEvent message payload + */ + public void publishEvent(final String topicName, final String eventKey, final NcmpEvent ncmpEvent) { + final ListenableFuture<SendResult<String, NcmpEvent>> ncmpEventFuture = + ncmpEventKafkaTemplate.send(topicName, eventKey, ncmpEvent); + + ncmpEventFuture.addCallback(new ListenableFutureCallback<>() { + @Override + public void onFailure(final Throwable throwable) { + log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); + } + + @Override + public void onSuccess(final SendResult<String, NcmpEvent> result) { + log.debug("Successfully published event to topic : {} , NcmpEvent : {}", + result.getRecordMetadata().topic(), result.getProducerRecord().value()); + } + }); + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index aa6bf1a783..31f179ab27 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -20,68 +20,26 @@ package org.onap.cps.ncmp.api.impl.async -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.StringSerializer +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.api.utils.MessagingSpec import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent -import org.springframework.kafka.core.DefaultKafkaProducerFactory -import org.springframework.kafka.core.KafkaTemplate -import org.springframework.kafka.support.serializer.JsonSerializer -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.spock.Testcontainers -import org.testcontainers.utility.DockerImageName - -import java.time.Duration -import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.ncmp.utils.TestUtils; -import org.springframework.boot.test.context.SpringBootTest import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext -import org.springframework.test.context.DynamicPropertyRegistry -import org.springframework.test.context.DynamicPropertySource -import spock.lang.Specification +import org.testcontainers.spock.Testcontainers + +import java.time.Duration -@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer]) +@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext -class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification { - - static kafkaTestContainer = new KafkaContainer( - DockerImageName.parse('confluentinc/cp-kafka:6.2.1') - ) - - static { - Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) - } - - def setupSpec() { - kafkaTestContainer.start() - } - - def producerConfigProperties = [ - (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], - (ProducerConfig.RETRIES_CONFIG) : 0, - (ProducerConfig.BATCH_SIZE_CONFIG) : 16384, - (ProducerConfig.LINGER_MS_CONFIG) : 1, - (ProducerConfig.BUFFER_MEMORY_CONFIG) : 33554432, - (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) : StringSerializer, - (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer - ] - - def consumerConfigProperties = [ - (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) : kafkaTestContainer.getBootstrapServers().split(',')[0], - (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) : StringDeserializer, - (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer, - (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) : 'earliest', - (ConsumerConfig.GROUP_ID_CONFIG) : 'test' - ] - - def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties)) +class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingSpec { @SpringBean NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService = @@ -96,9 +54,10 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService, ncmpAsyncRequestResponseEventMapper) - def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) + @Autowired + JsonObjectMapper jsonObjectMapper - def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties()) + def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test')) def 'Consume and forward valid message'() { given: 'consumer has a subscription' @@ -118,9 +77,4 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId()) } - @DynamicPropertySource - static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { - dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) - } - } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy new file mode 100644 index 0000000000..774a46558b --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy @@ -0,0 +1,84 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event + +import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.onap.cps.ncmp.api.utils.MessagingSpec +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.onap.ncmp.cmhandle.lcm.event.Event +import org.onap.ncmp.cmhandle.lcm.event.NcmpEvent +import org.spockframework.spring.SpringBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers + +import java.time.Duration + +@SpringBootTest(classes = [NcmpEventsPublisher, ObjectMapper, JsonObjectMapper]) +@Testcontainers +@DirtiesContext +class NcmpEventsPublisherSpec extends MessagingSpec { + + def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group')) + + def testTopic = 'ncmp-events-test' + + @SpringBean + NcmpEventsPublisher ncmpEventsPublisher = new NcmpEventsPublisher(kafkaTemplate) + + @Autowired + JsonObjectMapper jsonObjectMapper + + + def 'Produce and Consume Ncmp Event'() { + given: 'event key and event data' + def eventKey = 'ncmp' + def eventData = new NcmpEvent(eventId: 'test-uuid', + eventCorrelationId: 'cmhandle-as-correlationid', + eventSchema: URI.create('org.onap.ncmp.cmhandle.lcm.event:v1'), + eventSource: URI.create('org.onap.ncmp'), + eventTime: '2022-12-31T20:30:40.000+0000', + eventType: 'org.onap.ncmp.cmhandle.lcm.event', + event: new Event(cmHandleId: 'cmhandle-test', cmhandleState: 'READY', operation: 'CREATE', cmhandleProperties: [['publicProperty1': 'value1'], ['publicProperty2': 'value2']])) + and: 'we have an expected NcmpEvent' + def expectedJsonString = TestUtils.getResourceFileContent('expectedNcmpEvent.json') + def expectedNcmpEvent = jsonObjectMapper.convertJsonString(expectedJsonString, NcmpEvent.class) + and: 'consumer has a subscription' + kafkaConsumer.subscribe([testTopic] as List<String>) + when: 'an event is published' + ncmpEventsPublisher.publishEvent(testTopic, eventKey, eventData) + and: 'topic is polled' + def records = kafkaConsumer.poll(Duration.ofMillis(1500)) + then: 'no exception is thrown' + noExceptionThrown() + and: 'poll returns one record' + assert records.size() == 1 + and: 'record key matches the expected event key' + def record = records.iterator().next() + assert eventKey == record.key + and: 'record matches the expected event' + assert expectedNcmpEvent == jsonObjectMapper.convertJsonString(record.value, NcmpEvent.class) + + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy new file mode 100644 index 0000000000..097834afc5 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy @@ -0,0 +1,71 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.utils + +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.support.serializer.JsonSerializer +import org.springframework.test.context.DynamicPropertyRegistry +import org.springframework.test.context.DynamicPropertySource +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName +import spock.lang.Specification + +class MessagingSpec extends Specification { + + static { + Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop)) + } + + def setupSpec() { + kafkaTestContainer.start() + } + + static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:6.2.1')) + + def producerConfigProperties() { + return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0], + ('retries') : 0, + ('batch-size') : 16384, + ('linger.ms') : 1, + ('buffer.memory') : 33554432, + ('key.serializer') : StringSerializer, + ('value.serializer') : JsonSerializer] + } + + def consumerConfigProperties(consumerGroupId) { + return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0], + ('key.deserializer') : StringDeserializer, + ('value.deserializer'): StringDeserializer, + ('auto.offset.reset') : 'earliest', + ('group.id') : consumerGroupId + ] + } + + def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties())) + + @DynamicPropertySource + static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) { + dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers) + } +} diff --git a/cps-ncmp-service/src/test/resources/expectedNcmpEvent.json b/cps-ncmp-service/src/test/resources/expectedNcmpEvent.json new file mode 100644 index 0000000000..903bc3aab8 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/expectedNcmpEvent.json @@ -0,0 +1,21 @@ +{ + "eventId": "test-uuid", + "eventCorrelationId": "cmhandle-as-correlationid", + "eventTime": "2022-12-31T20:30:40.000+0000", + "eventSource": "org.onap.ncmp", + "eventType": "org.onap.ncmp.cmhandle.lcm.event", + "eventSchema": "org.onap.ncmp.cmhandle.lcm.event:v1", + "event": { + "cmHandleId": "cmhandle-test", + "operation": "CREATE", + "cmhandle-state": "READY", + "cmhandle-properties": [ + { + "publicProperty1": "value1" + }, + { + "publicProperty2": "value2" + } + ] + } +}
\ No newline at end of file |