diff options
author | halil.cakal <halil.cakal@est.tech> | 2023-07-03 15:24:02 +0100 |
---|---|---|
committer | halil.cakal <halil.cakal@est.tech> | 2023-07-07 17:38:00 +0100 |
commit | adfb0693ec18025abcd8b9036bafe20a25f2e496 (patch) | |
tree | bf8e1b1f2918ce5298774a379121b34e37b65742 /cps-ncmp-service/src | |
parent | fb7005ffda3f7d2194061192c5c8e4574a72027b (diff) |
Subscription Creation: NCMP to DMI CloudEvent transformation
- Add mapper to convert client event into ncmp event
- Add sample json object of ncmp version
- Change subscription event consumer to consume CloudEvents
- Change subscription event forwarder to publish CloudEvents
- Change test producer config to support CloudEvents
- Change sample subscription event json to comply with new schema
- Add more test for missing branches
- Change packages of the mappers into relevant directory
Issue-ID: CPS-1737
Change-Id: I8c9e6e7bf713a8fb530a0586dfb2bce796a462f5
Signed-off-by: halil.cakal <halil.cakal@est.tech>
Diffstat (limited to 'cps-ncmp-service/src')
14 files changed, 473 insertions, 144 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java new file mode 100644 index 0000000000..59b1d09c7b --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapper.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription; + +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; + +@Mapper(componentModel = "spring") +public interface ClientSubscriptionEventMapper { + + @Mapping(target = "data.predicates.targets", ignore = true) + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent toNcmpSubscriptionEvent( + SubscriptionEvent subscriptionEvent); + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java index 88b41d0075..f511965c77 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumer.java @@ -20,13 +20,14 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; +import io.cloudevents.CloudEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; -import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; @@ -54,28 +55,25 @@ public class SubscriptionEventConsumer { * @param subscriptionEventConsumerRecord the event to be consumed */ @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) - public void consumeSubscriptionEvent( - final ConsumerRecord<String, SubscriptionEvent> subscriptionEventConsumerRecord) { - final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value(); - final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); - final String eventDatastore = event.getPredicates().getDatastore(); + containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory") + public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) { + final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value(); + final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent); + final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore(); if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { throw new OperationNotYetSupportedException( "passthrough datastores are currently only supported for event subscriptions"); } - if ("CM".equals(event.getDataType().getDataCategory())) { - log.debug("Consuming event {} ...", subscriptionEvent); + if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) { if (subscriptionModelLoaderEnabled) { persistSubscriptionEvent(subscriptionEvent); } - if ("CREATE".equals(subscriptionEvent.getEventType().value())) { + if ("CREATE".equals(cloudEvent.getType())) { log.info("Subscription for ClientID {} with name {} ...", - event.getSubscription().getClientID(), - event.getSubscription().getName()); + subscriptionEvent.getData().getSubscription().getClientID(), + subscriptionEvent.getData().getSubscription().getName()); if (notificationFeatureEnabled) { - subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, - subscriptionEventConsumerRecord.headers()); + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); } } } else { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java index 1d87a057a7..1fe963a279 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarder.java @@ -21,13 +21,12 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription; import com.hazelcast.map.IMap; +import io.cloudevents.CloudEvent; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -35,16 +34,17 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.header.Headers; import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.utils.DmiServiceNameOrganizer; +import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; import org.onap.cps.ncmp.api.inventory.InventoryPersistence; -import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle; import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -56,10 +56,11 @@ import org.springframework.stereotype.Component; public class SubscriptionEventForwarder { private final InventoryPersistence inventoryPersistence; - private final EventsPublisher<SubscriptionEvent> eventsPublisher; + private final EventsPublisher<CloudEvent> eventsPublisher; private final IMap<String, Set<String>> forwardedSubscriptionEventCache; private final SubscriptionEventResponseOutcome subscriptionEventResponseOutcome; private final SubscriptionEventMapper subscriptionEventMapper; + private final ClientSubscriptionEventMapper clientSubscriptionEventMapper; private final SubscriptionPersistence subscriptionPersistence; private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); @Value("${app.ncmp.avc.subscription-forward-topic-prefix}") @@ -73,28 +74,22 @@ public class SubscriptionEventForwarder { * * @param subscriptionEvent the event to be forwarded */ - public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent, - final Headers eventHeaders) { - final List<Object> cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + final List<String> cmHandleTargets = subscriptionEvent.getData().getPredicates().getTargets(); if (cmHandleTargets == null || cmHandleTargets.isEmpty() - || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { + || cmHandleTargets.stream().anyMatch(id -> (id).contains("*"))) { throw new OperationNotYetSupportedException( "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); } - final List<String> cmHandleTargetsAsStrings = cmHandleTargets.stream().map( - Objects::toString).collect(Collectors.toList()); final Collection<YangModelCmHandle> yangModelCmHandles = - inventoryPersistence.getYangModelCmHandles(cmHandleTargetsAsStrings); - + inventoryPersistence.getYangModelCmHandles(cmHandleTargets); final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName = DmiServiceNameOrganizer.getDmiPropertiesPerCmHandleIdPerServiceName(yangModelCmHandles); - - findDmisAndRespond(subscriptionEvent, eventHeaders, cmHandleTargetsAsStrings, - dmiPropertiesPerCmHandleIdPerServiceName); + findDmisAndRespond(subscriptionEvent, cmHandleTargets, dmiPropertiesPerCmHandleIdPerServiceName); } - private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, final Headers eventHeaders, - final List<String> cmHandleTargetsAsStrings, + private void findDmisAndRespond(final SubscriptionEvent subscriptionEvent, + final List<String> cmHandleTargetsAsStrings, final Map<String, Map<String, Map<String, String>>> dmiPropertiesPerCmHandleIdPerServiceName) { final List<String> cmHandlesThatExistsInDb = dmiPropertiesPerCmHandleIdPerServiceName.entrySet().stream() @@ -109,18 +104,20 @@ public class SubscriptionEventForwarder { updatesCmHandlesToRejectedAndPersistSubscriptionEvent(subscriptionEvent, targetCmHandlesDoesNotExistInDb); } if (dmisToRespond.isEmpty()) { - final String clientID = subscriptionEvent.getEvent().getSubscription().getClientID(); - final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + final String clientID = subscriptionEvent.getData().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getData().getSubscription().getName(); subscriptionEventResponseOutcome.sendResponse(clientID, subscriptionName); } else { startResponseTimeout(subscriptionEvent, dmisToRespond); - forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, subscriptionEvent, eventHeaders); + final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent = + clientSubscriptionEventMapper.toNcmpSubscriptionEvent(subscriptionEvent); + forwardEventToDmis(dmiPropertiesPerCmHandleIdPerServiceName, ncmpSubscriptionEvent); } } private void startResponseTimeout(final SubscriptionEvent subscriptionEvent, final Set<String> dmisToRespond) { - final String subscriptionClientId = subscriptionEvent.getEvent().getSubscription().getClientID(); - final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName(); + final String subscriptionClientId = subscriptionEvent.getData().getSubscription().getClientID(); + final String subscriptionName = subscriptionEvent.getData().getSubscription().getName(); final String subscriptionEventId = subscriptionClientId + subscriptionName; forwardedSubscriptionEventCache.put(subscriptionEventId, dmisToRespond, @@ -137,20 +134,33 @@ public class SubscriptionEventForwarder { } private void forwardEventToDmis(final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap, - final SubscriptionEvent subscriptionEvent, - final Headers eventHeaders) { + final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent + ncmpSubscriptionEvent) { dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { - subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); - final String eventKey = createEventKey(subscriptionEvent, dmiName); + final List<CmHandle> cmHandleTargets = cmHandlePropertiesMap.entrySet().stream().map( + cmHandleAndProperties -> { + final CmHandle cmHandle = new CmHandle(); + cmHandle.setId(cmHandleAndProperties.getKey()); + cmHandle.setAdditionalProperties(cmHandleAndProperties.getValue()); + return cmHandle; + }).collect(Collectors.toList()); + + ncmpSubscriptionEvent.getData().getPredicates().setTargets(cmHandleTargets); + final String eventKey = createEventKey(ncmpSubscriptionEvent, dmiName); final String dmiAvcSubscriptionTopic = dmiAvcSubscriptionTopicPrefix + dmiName; - eventsPublisher.publishEvent(dmiAvcSubscriptionTopic, eventKey, eventHeaders, subscriptionEvent); + + final CloudEvent ncmpSubscriptionCloudEvent = + SubscriptionEventCloudMapper.toCloudEvent(ncmpSubscriptionEvent, eventKey); + eventsPublisher.publishCloudEvent(dmiAvcSubscriptionTopic, eventKey, ncmpSubscriptionCloudEvent); }); } - private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { - return subscriptionEvent.getEvent().getSubscription().getClientID() + private String createEventKey( + final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent subscriptionEvent, + final String dmiName) { + return subscriptionEvent.getData().getSubscription().getClientID() + "-" - + subscriptionEvent.getEvent().getSubscription().getName() + + subscriptionEvent.getData().getSubscription().getName() + "-" + dmiName; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java index dcbdcf339d..bf9ceb1c3d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapper.java @@ -27,17 +27,16 @@ import org.mapstruct.Mapping; import org.mapstruct.Named; import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus; import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent; -import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; @Mapper(componentModel = "spring") public interface SubscriptionEventMapper { - @Mapping(source = "event.subscription.clientID", target = "clientId") - @Mapping(source = "event.subscription.name", target = "subscriptionName") - @Mapping(source = "event.subscription.isTagged", target = "tagged") - @Mapping(source = "event.predicates.targets", target = "predicates.targetCmHandles", + @Mapping(source = "data.subscription.clientID", target = "clientId") + @Mapping(source = "data.subscription.name", target = "subscriptionName") + @Mapping(source = "data.predicates.targets", target = "predicates.targetCmHandles", qualifiedByName = "mapTargetsToCmHandleTargets") - @Mapping(source = "event.predicates.datastore", target = "predicates.datastore") + @Mapping(source = "data.predicates.datastore", target = "predicates.datastore") YangModelSubscriptionEvent toYangModelSubscriptionEvent(SubscriptionEvent subscriptionEvent); /** @@ -47,8 +46,8 @@ public interface SubscriptionEventMapper { * @return TargetCmHandle list */ @Named("mapTargetsToCmHandleTargets") - default List<YangModelSubscriptionEvent.TargetCmHandle> mapTargetsToCmHandleTargets(List<Object> targets) { - return targets.stream().map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target.toString(), + default List<YangModelSubscriptionEvent.TargetCmHandle> mapTargetsToCmHandleTargets(List<String> targets) { + return targets.stream().map(target -> new YangModelSubscriptionEvent.TargetCmHandle(target, SubscriptionStatus.PENDING)) .collect(Collectors.toList()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java new file mode 100644 index 0000000000..a7de479046 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapper.java @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.CloudEventUtils; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.core.data.PojoCloudEventData; +import io.cloudevents.jackson.PojoCloudEventDataMapper; +import java.net.URI; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; + +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public class SubscriptionEventCloudMapper { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Maps CloudEvent object to SubscriptionEvent. + * + * @param cloudEvent object. + * @return SubscriptionEvent deserialized. + */ + public static SubscriptionEvent toSubscriptionEvent(final CloudEvent cloudEvent) { + final PojoCloudEventData<SubscriptionEvent> deserializedCloudEvent = CloudEventUtils + .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, SubscriptionEvent.class)); + if (deserializedCloudEvent == null) { + log.debug("No data found in the consumed event"); + return null; + } else { + final SubscriptionEvent subscriptionEvent = deserializedCloudEvent.getValue(); + log.debug("Consuming event {}", subscriptionEvent); + return subscriptionEvent; + } + } + + /** + * Maps SubscriptionEvent to a CloudEvent. + * + * @param ncmpSubscriptionEvent object. + * @param eventKey as String. + * @return CloudEvent builded. + */ + public static CloudEvent toCloudEvent( + final org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent ncmpSubscriptionEvent, + final String eventKey) { + try { + return CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(ncmpSubscriptionEvent)) + .withId(eventKey).withType("CREATE").withSource( + URI.create(ncmpSubscriptionEvent.getData().getSubscription().getClientID())).build(); + } catch (final Exception ex) { + throw new RuntimeException("The Cloud Event could not be constructed.", ex); + } + } +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapperSpec.groovy new file mode 100644 index 0000000000..85a58cdf30 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/ClientSubscriptionEventMapperSpec.groovy @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.events.avcsubscription + +import com.fasterxml.jackson.databind.ObjectMapper +import org.mapstruct.factory.Mappers +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification + +@SpringBootTest(classes = [JsonObjectMapper, ObjectMapper]) +class ClientSubscriptionEventMapperSpec extends Specification { + + ClientSubscriptionEventMapper objectUnderTest = Mappers.getMapper(ClientSubscriptionEventMapper) + + @Autowired + JsonObjectMapper jsonObjectMapper + + def 'Map clients subscription event to ncmps subscription event'() { + given: 'a Subscription Event' + def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') + def testEventToMap = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + when: 'the client event is mapped to a ncmp subscription event' + def result = objectUnderTest.toNcmpSubscriptionEvent(testEventToMap) + then: 'the resulting ncmp subscription event contains the correct clientId' + assert result.getData().getSubscription().getClientID() == "SCO-9989752" + and: 'subscription name' + assert result.getData().getSubscription().getName() == "cm-subscription-001" + and: 'is tagged value is false' + assert result.getData().getSubscription().getIsTagged() == false + and: 'data category is CM' + assert result.getData().getDataType().getDataCategory() == 'CM' + and: 'predicate targets is null' + assert result.getData().getPredicates().getTargets() == [] + and: 'datastore is passthrough-running' + assert result.getData().getPredicates().getDatastore() == 'passthrough-running' + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy index cccd61b716..d4ab1e88ad 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventConsumerSpec.groovy @@ -21,11 +21,13 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.builder.CloudEventBuilder import org.apache.kafka.clients.consumer.ConsumerRecord import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.event.model.SubscriptionEvent +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent; import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.spi.exceptions.OperationNotYetSupportedException import org.onap.cps.utils.JsonObjectMapper @@ -45,70 +47,57 @@ class SubscriptionEventConsumerSpec extends MessagingBaseSpec { @Autowired JsonObjectMapper jsonObjectMapper + @Autowired + ObjectMapper objectMapper + + def 'Consume, persist and forward valid CM create message'() { given: 'an event with data category CM' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) + testEventSent.getData().getDataType().setDataCategory(dataCategory) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('some-event-id') + .withType(dataType) + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent) and: 'notifications are enabled' - objectUnderTest.notificationFeatureEnabled = true + objectUnderTest.notificationFeatureEnabled = isNotificationEnabled and: 'subscription model loader is enabled' - objectUnderTest.subscriptionModelLoaderEnabled = true + objectUnderTest.subscriptionModelLoaderEnabled = isModelLoaderEnabled when: 'the valid event is consumed' objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'the event is mapped to a yangModelSubscription' - 1 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent + numberOfTimesToPersist * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent and: 'the event is persisted' - 1 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent) + numberOfTimesToPersist * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent) and: 'the event is forwarded' - 1 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) - } - - def 'Consume valid CM create message where notifications and model loader are disabled'() { - given: 'an event with data category CM' - def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) - and: 'notifications are disabled' - objectUnderTest.notificationFeatureEnabled = false - and: 'subscription model loader is disabled' - objectUnderTest.subscriptionModelLoaderEnabled = false - when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(consumerRecord) - then: 'the event is not mapped to a yangModelSubscription' - 0 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(*_) >> yangModelSubscriptionEvent - and: 'the event is not persisted' - 0 * mockSubscriptionPersistence.saveSubscriptionEvent(*_) - and: 'the event is not forwarded' - 0 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(*_) - } - - def 'Consume valid FM message'() { - given: 'an event' - def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') - def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) - and: 'dataCategory is set to FM' - testEventSent.getEvent().getDataType().setDataCategory("FM") - when: 'the valid event is consumed' - objectUnderTest.consumeSubscriptionEvent(consumerRecord) - then: 'no exception is thrown' - noExceptionThrown() - and: 'the event is not mapped to a yangModelSubscription' - 0 * mockSubscriptionEventMapper.toYangModelSubscriptionEvent(testEventSent) >> yangModelSubscriptionEvent - and: 'the event is not persisted' - 0 * mockSubscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent) - and: 'No event is forwarded' - 0 * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(*_) + numberOfTimesToForward * mockSubscriptionEventForwarder.forwardCreateSubscriptionEvent(testEventSent) + where: 'given values are used' + scenario | dataCategory | dataType | isNotificationEnabled | isModelLoaderEnabled || numberOfTimesToForward || numberOfTimesToPersist + 'Both model loader and notification are enabled' | 'CM' | 'CREATE' | true | true || 1 || 1 + 'Both model loader and notification are disabled' | 'CM' | 'CREATE' | false | false || 0 || 0 + 'Model loader enabled and notification disabled' | 'CM' | 'CREATE' | false | true || 0 || 1 + 'Model loader disabled and notification enabled' | 'CM' | 'CREATE' | true | false || 1 || 0 + 'Flags are enabled but data category is FM' | 'FM' | 'CREATE' | true | true || 0 || 0 + 'Flags are enabled but data type is UPDATE' | 'CM' | 'UPDATE' | true | true || 0 || 1 } def 'Consume event with wrong datastore causes an exception'() { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'datastore is set to a non passthrough datastore' - testEventSent.getEvent().getPredicates().setDatastore("operational") + testEventSent.getData().getPredicates().setDatastore('operational') + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('some-event-id') + .withType('CREATE') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent) when: 'the valid event is consumed' objectUnderTest.consumeSubscriptionEvent(consumerRecord) then: 'an operation not yet supported exception is thrown' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy index 41597edec8..2af32c20e9 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventForwarderSpec.groovy @@ -22,7 +22,10 @@ package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper import com.hazelcast.map.IMap -import org.apache.kafka.clients.consumer.ConsumerRecord +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.data.PojoCloudEventData +import io.cloudevents.jackson.PojoCloudEventDataMapper import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence @@ -31,7 +34,8 @@ import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent.TargetCmHandle import org.onap.cps.ncmp.api.inventory.InventoryPersistence import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.event.model.SubscriptionEvent +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent +import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle; import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.spi.exceptions.OperationNotYetSupportedException import org.onap.cps.utils.JsonObjectMapper @@ -51,7 +55,7 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { @SpringBean InventoryPersistence mockInventoryPersistence = Mock(InventoryPersistence) @SpringBean - EventsPublisher<SubscriptionEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<SubscriptionEvent>) + EventsPublisher<CloudEvent> mockSubscriptionEventPublisher = Mock(EventsPublisher<CloudEvent>) @SpringBean IMap<String, Set<String>> mockForwardedSubscriptionEventCache = Mock(IMap<String, Set<String>>) @SpringBean @@ -60,14 +64,17 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { SubscriptionPersistence mockSubscriptionPersistence = Mock(SubscriptionPersistence) @SpringBean SubscriptionEventMapper subscriptionEventMapper = Mappers.getMapper(SubscriptionEventMapper) + @SpringBean + ClientSubscriptionEventMapper clientSubscriptionEventMapper = Mappers.getMapper(ClientSubscriptionEventMapper) @Autowired JsonObjectMapper jsonObjectMapper - def 'Forward valid CM create subscription and simulate timeout where #scenario'() { + def objectMapper = new ObjectMapper() + + def 'Forward valid CM create subscription and simulate timeout'() { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the some of the cm handles will be accepted and some of rejected' def cmHandlesToBeSavedInDb = [new TargetCmHandle('CMHandle1', SubscriptionStatus.ACCEPTED), new TargetCmHandle('CMHandle2',SubscriptionStatus.ACCEPTED), @@ -85,17 +92,18 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds' def block = new BlockingVariable<Object>(5) when: 'the valid event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) then: 'An asynchronous call is made to the blocking variable' block.get() then: 'the event is added to the forwarded subscription event cache' 1 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1"] as Set, 600, TimeUnit.SECONDS) and: 'the event is forwarded twice with the CMHandle private properties and provides a valid listenable future' - 1 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", - consumerRecord.headers(), subscriptionEvent -> { - Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) - targets["CMHandle1"] == ["shape":"circle"] - targets["CMHandle2"] == ["shape":"square"] + 1 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", + cloudEvent -> { + def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets() + def cmHandle2 = createCmHandle('CMHandle2', ['shape':'square'] as Map) + def cmHandle1 = createCmHandle('CMHandle1', ['shape':'circle'] as Map) + targets == [cmHandle2, cmHandle1] } ) and: 'the persistence service save the yang model subscription event' @@ -111,11 +119,10 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the target CMHandles are set to #scenario' - testEventSent.getEvent().getPredicates().setTargets(invalidTargets) + testEventSent.getData().getPredicates().setTargets(invalidTargets) when: 'the event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) then: 'an operation not yet supported exception is thrown' thrown(OperationNotYetSupportedException) where: @@ -129,7 +136,6 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { given: 'an event' def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) - def consumerRecord = new ConsumerRecord<String, SubscriptionEvent>('topic-name', 0, 0, 'event-key', testEventSent) and: 'the cm handles will be rejected' def rejectedCmHandles = [new TargetCmHandle('CMHandle1', SubscriptionStatus.REJECTED), new TargetCmHandle('CMHandle2',SubscriptionStatus.REJECTED), @@ -144,21 +150,23 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { and: 'a Blocking Variable is used for the Asynchronous call with a timeout of 5 seconds' def block = new BlockingVariable<Object>(5) when: 'the valid event is forwarded' - objectUnderTest.forwardCreateSubscriptionEvent(testEventSent, consumerRecord.headers()) + objectUnderTest.forwardCreateSubscriptionEvent(testEventSent) then: 'the event is not added to the forwarded subscription event cache' 0 * mockForwardedSubscriptionEventCache.put("SCO-9989752cm-subscription-001", ["DMIName1", "DMIName2"] as Set) and: 'the event is not being forwarded with the CMHandle private properties and does not provides a valid listenable future' - 0 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", - consumerRecord.headers(),subscriptionEvent -> { - Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) - targets["CMHandle1"] == ["shape":"circle"] - targets["CMHandle2"] == ["shape":"square"] + 0 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName1", "SCO-9989752-cm-subscription-001-DMIName1", + cloudEvent -> { + def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets() + def cmHandle2 = createCmHandle('CMHandle2', ['shape':'square'] as Map) + def cmHandle1 = createCmHandle('CMHandle1', ['shape':'circle'] as Map) + targets == [cmHandle2, cmHandle1] } ) - 0 * mockSubscriptionEventPublisher.publishEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2", - consumerRecord.headers(),subscriptionEvent -> { - Map targets = subscriptionEvent.getEvent().getPredicates().getTargets().get(0) - targets["CMHandle3"] == ["shape":"triangle"] + 0 * mockSubscriptionEventPublisher.publishCloudEvent("ncmp-dmi-cm-avc-subscription-DMIName2", "SCO-9989752-cm-subscription-001-DMIName2", + cloudEvent -> { + def targets = toSubscriptionEvent(cloudEvent).getData().getPredicates().getTargets() + def cmHandle3 = createCmHandle('CMHandle3', ['shape':'triangle'] as Map) + targets == [cmHandle3] } ) and: 'a separate thread has been created where the map is polled' @@ -176,4 +184,22 @@ class SubscriptionEventForwarderSpec extends MessagingBaseSpec { return new YangModelCmHandle(id:"CMHandle" + id, dmiDataServiceName: "DMIName" + dmiId, dmiProperties: [new YangModelCmHandle.Property(propertyName,propertyValue)]) } + static def createCmHandle(id, additionalProperties) { + def cmHandle = new CmHandle(); + cmHandle.setId(id) + cmHandle.setAdditionalProperties(additionalProperties) + return cmHandle + } + + def toSubscriptionEvent(cloudEvent) { + final PojoCloudEventData<org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent> deserializedCloudEvent = CloudEventUtils + .mapData(cloudEvent, PojoCloudEventDataMapper.from(objectMapper, + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent.class)); + if (deserializedCloudEvent == null) { + return null; + } else { + return deserializedCloudEvent.getValue(); + } + } + } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapperSpec.groovy index 6d02ac719e..a2a80e854b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventMapperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventMapperSpec.groovy @@ -18,13 +18,13 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.avc +package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.avcsubscription.SubscriptionEventMapper import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionStatus -import org.onap.cps.ncmp.event.model.SubscriptionEvent +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.springframework.beans.factory.annotation.Autowired diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy index cde0d1fa00..00412aa933 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/SubscriptionEventResponseMapperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avcsubscription/SubscriptionEventResponseMapperSpec.groovy @@ -18,7 +18,7 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.events.avc +package org.onap.cps.ncmp.api.impl.events.avcsubscription import com.fasterxml.jackson.databind.ObjectMapper import org.mapstruct.factory.Mappers diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy new file mode 100644 index 0000000000..61eb319101 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/utils/SubscriptionEventCloudMapperSpec.groovy @@ -0,0 +1,104 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an 'AS IS' BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.utils + +import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.core.builder.CloudEventBuilder +import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent +import org.onap.cps.ncmp.utils.TestUtils +import org.onap.cps.utils.JsonObjectMapper +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import spock.lang.Specification + +@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) +class SubscriptionEventCloudMapperSpec extends Specification { + + @Autowired + JsonObjectMapper jsonObjectMapper + + @Autowired + ObjectMapper objectMapper + + def 'Map the data of the cloud event to subscription event'() { + given: 'a cloud event having a subscription event in the data part' + def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json') + def testEventData = jsonObjectMapper.convertJsonString(jsonData, SubscriptionEvent.class) + def testCloudEvent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventData)) + .withId('some-event-id') + .withType('CREATE') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + when: 'the cloud event map to subscription event' + def resultSubscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(testCloudEvent) + then: 'the subscription event resulted having expected values' + resultSubscriptionEvent.getData() == testEventData.getData() + } + + def 'Map the null of the data of the cloud event to subscription event'() { + given: 'a cloud event having a null subscription event in the data part' + def testCloudEvent = CloudEventBuilder.v1() + .withData(null) + .withId('some-event-id') + .withType('CREATE') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + when: 'the cloud event map to subscription event' + def resultSubscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(testCloudEvent) + then: 'the subscription event resulted having a null value' + resultSubscriptionEvent == null + } + + def 'Map the subscription event to data of the cloud event'() { + given: 'a subscription event' + def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEventNcmpVersion.json') + def testEventData = jsonObjectMapper.convertJsonString(jsonData, + org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent.class) + def testCloudEvent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventData)) + .withId('some-event-key') + .withType('CREATE') + .withSource(URI.create('some-resource')) + .withExtension('correlationid', 'test-cmhandle1').build() + when: 'the subscription event map to data of cloud event' + def resultCloudEvent = SubscriptionEventCloudMapper.toCloudEvent(testEventData, 'some-event-key') + then: 'the subscription event resulted having expected values' + resultCloudEvent.getData() == testCloudEvent.getData() + resultCloudEvent.getId() == testCloudEvent.getId() + resultCloudEvent.getType() == testCloudEvent.getType() + } + + def 'Map the subscription event to data of the cloud event with wrong content causes an exception'() { + given: 'an empty ncmp subscription event' + def testNcmpSubscriptionEvent = new org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent() + when: 'the subscription event map to data of cloud event' + def thrownException = null + try { + SubscriptionEventCloudMapper.toCloudEvent(testNcmpSubscriptionEvent, 'some-key') + } catch (Exception e) { + thrownException = e + } + then: 'a run time exception is thrown' + assert thrownException instanceof RuntimeException + } + +} diff --git a/cps-ncmp-service/src/test/java/org/onap/cps/ncmp/utils/KafkaDemoProducerConfig.java b/cps-ncmp-service/src/test/java/org/onap/cps/ncmp/utils/KafkaDemoProducerConfig.java index 43d26e900f..a9fd6f09f3 100644 --- a/cps-ncmp-service/src/test/java/org/onap/cps/ncmp/utils/KafkaDemoProducerConfig.java +++ b/cps-ncmp-service/src/test/java/org/onap/cps/ncmp/utils/KafkaDemoProducerConfig.java @@ -20,17 +20,17 @@ package org.onap.cps.ncmp.utils; +import io.cloudevents.CloudEvent; +import io.cloudevents.kafka.CloudEventSerializer; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.onap.cps.ncmp.event.model.SubscriptionEvent; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.serializer.JsonSerializer; @Configuration public class KafkaDemoProducerConfig { @@ -41,17 +41,17 @@ public class KafkaDemoProducerConfig { * @return kafka producer factory object of subscription event */ @Bean - public ProducerFactory<String, SubscriptionEvent> producerFactory() { + public ProducerFactory<String, CloudEvent> producerFactory() { final Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://localhost:9092,CONNECTIONS_FROM_HOST://localhost:19092"); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean - public KafkaTemplate<String, SubscriptionEvent> kafkaTemplate() { + public KafkaTemplate<String, CloudEvent> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEvent.json b/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEvent.json index 63fca1f415..e4539fb38a 100644 --- a/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEvent.json +++ b/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEvent.json @@ -1,7 +1,5 @@ { - "version": "1.0", - "eventType": "CREATE", - "event": { + "data": { "subscription": { "clientID": "SCO-9989752", "name": "cm-subscription-001" @@ -9,15 +7,16 @@ "dataType": { "dataspace": "ALL", "dataCategory": "CM", - "dataProvider": "CM-SERVICE", - "schemaName": "org.onap.ncmp:cm-network-avc-event.rfc8641", - "schemaVersion": "1.0" + "dataProvider": "CM-SERVICE" }, "predicates": { - "targets" : ["CMHandle1", "CMHandle2", "CMHandle3"], + "targets": [ + "CMHandle1", + "CMHandle2", + "CMHandle3" + ], "datastore": "passthrough-running", - "xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" + "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" + } } - -} }
\ No newline at end of file diff --git a/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEventNcmpVersion.json b/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEventNcmpVersion.json new file mode 100644 index 0000000000..f31362a1c6 --- /dev/null +++ b/cps-ncmp-service/src/test/resources/avcSubscriptionCreationEventNcmpVersion.json @@ -0,0 +1,31 @@ +{ + "data": { + "subscription": { + "clientID": "SCO-9989752", + "name": "cm-subscription-001" + }, + "dataType": { + "dataspace": "ALL", + "dataCategory": "CM", + "dataProvider": "CM-SERVICE" + }, + "predicates": { + "targets":[ + { + "id":"CMHandle2", + "additional-properties":{ + "Books":"Novel" + } + }, + { + "id":"CMHandle1", + "additional-properties":{ + "Books":"Social Media" + } + } + ], + "datastore": "passthrough-running", + "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//" + } + } +}
\ No newline at end of file |