diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java) | 23 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java | 25 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java | 106 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java | 5 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java | 20 |
5 files changed, 161 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java index eda881767d..60d39db7a2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java @@ -18,11 +18,10 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.event.lcm; +package org.onap.cps.ncmp.api.impl.event; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -30,36 +29,36 @@ import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** - * LcmEventsPublisher to publish the LcmEvents on event of CREATE, UPDATE and DELETE. + * EventsPublisher to publish events. */ @Slf4j @Service @RequiredArgsConstructor -public class LcmEventsPublisher { +public class EventsPublisher<T> { - private final KafkaTemplate<String, LcmEvent> lcmEventKafkaTemplate; + private final KafkaTemplate<String, T> eventKafkaTemplate; /** * LCM Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param lcmEvent message payload + * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final LcmEvent lcmEvent) { - final ListenableFuture<SendResult<String, LcmEvent>> lcmEventFuture = - lcmEventKafkaTemplate.send(topicName, eventKey, lcmEvent); + public void publishEvent(final String topicName, final String eventKey, final T event) { + final ListenableFuture<SendResult<String, T>> eventFuture = + eventKafkaTemplate.send(topicName, eventKey, event); - lcmEventFuture.addCallback(new ListenableFutureCallback<>() { + eventFuture.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); } @Override - public void onSuccess(final SendResult<String, LcmEvent> sendResult) { - log.debug("Successfully published event to topic : {} , LcmEvent : {}", + public void onSuccess(final SendResult<String, T> sendResult) { + log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } }); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java index 92949cbb79..d08baac5d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java @@ -22,7 +22,10 @@ package org.onap.cps.ncmp.api.impl.event.avc; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -32,6 +35,11 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class SubscriptionEventConsumer { + private final SubscriptionEventForwarder subscriptionEventForwarder; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + /** * Consume the specified event. * @@ -40,12 +48,21 @@ public class SubscriptionEventConsumer { @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { - if ("CM".equals(subscriptionEvent.getEvent().getDataType().getDataCategory())) { + final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); + final String eventDatastore = event.getPredicates().getDatastore(); + if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { + throw new OperationNotYetSupportedException( + "passthrough datastores are currently only supported for event subscriptions"); + } + if ("CM".equals(event.getDataType().getDataCategory())) { log.debug("Consuming event {} ...", subscriptionEvent); if ("CREATE".equals(subscriptionEvent.getEventType().value())) { - log.info("Subscription for ClientID {} with name{} ...", - subscriptionEvent.getEvent().getSubscription().getClientID(), - subscriptionEvent.getEvent().getSubscription().getName()); + log.info("Subscription for ClientID {} with name {} ...", + event.getSubscription().getClientID(), + event.getSubscription().getName()); + if (notificationFeatureEnabled) { + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + } } } else { log.trace("Non-CM subscription event ignored"); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java new file mode 100644 index 0000000000..635059bfe1 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; +import org.onap.cps.ncmp.api.impl.operations.RequiredDmiService; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventForwarder { + + private final InventoryPersistence inventoryPersistence; + private final EventsPublisher<SubscriptionEvent> eventsPublisher; + + private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + + /** + * Forward subscription event. + * + * @param subscriptionEvent the event to be forwarded + */ + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + final List<Object> cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); + if (cmHandleTargets == null || cmHandleTargets.isEmpty() + || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { + throw new OperationNotYetSupportedException( + "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); + } + final List<String> cmHandleTargetsAsStrings = cmHandleTargets.stream().map( + Objects::toString).collect(Collectors.toList()); + final Collection<YangModelCmHandle> yangModelCmHandles = + inventoryPersistence.getYangModelCmHandles(cmHandleTargetsAsStrings); + final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap = + organizeByDmiName(yangModelCmHandles); + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); + final String eventKey = createEventKey(subscriptionEvent, dmiName); + eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); + }); + } + + private Map<String, Map<String, Map<String, String>>> organizeByDmiName( + final Collection<YangModelCmHandle> yangModelCmHandles) { + final Map<String, Map<String, Map<String, String>>> dmiNameCmHandlePropertiesMap = new HashMap<>(); + yangModelCmHandles.forEach(cmHandle -> { + final String dmiName = cmHandle.resolveDmiServiceName(RequiredDmiService.DATA); + if (!dmiNameCmHandlePropertiesMap.containsKey(dmiName)) { + final Map<String, Map<String, String>> cmHandleDmiPropertiesMap = new HashMap<>(); + cmHandleDmiPropertiesMap.put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + dmiNameCmHandlePropertiesMap.put(cmHandle.getDmiDataServiceName(), cmHandleDmiPropertiesMap); + } else { + dmiNameCmHandlePropertiesMap.get(cmHandle.getDmiDataServiceName()) + .put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + } + }); + return dmiNameCmHandlePropertiesMap; + } + + private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { + return subscriptionEvent.getEvent().getSubscription().getClientID() + + "-" + + subscriptionEvent.getEvent().getSubscription().getName() + + "-" + + dmiName; + } + + public Map<String, String> dmiPropertiesAsMap(final YangModelCmHandle yangModelCmHandle) { + return yangModelCmHandle.getDmiProperties().stream().collect( + Collectors.toMap(YangModelCmHandle.Property::getName, YangModelCmHandle.Property::getValue)); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java index a94d664de9..2eba83053b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.api.impl.event.lcm; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; @@ -37,7 +38,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class LcmEventsService { - private final LcmEventsPublisher lcmEventsPublisher; + private final EventsPublisher<LcmEvent> eventsPublisher; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -56,7 +57,7 @@ public class LcmEventsService { public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) { if (notificationsEnabled) { try { - lcmEventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); + eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java index 0d82bb5c1c..705c9d2664 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/init/SubscriptionModelLoader.java @@ -22,11 +22,13 @@ package org.onap.cps.ncmp.init; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.time.OffsetDateTime; import java.util.Map; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.api.CpsAdminService; +import org.onap.cps.api.CpsDataService; import org.onap.cps.api.CpsModuleService; import org.onap.cps.ncmp.api.impl.exception.NcmpStartUpException; import org.onap.cps.spi.exceptions.AlreadyDefinedException; @@ -42,9 +44,11 @@ public class SubscriptionModelLoader implements ModelLoader { private final CpsAdminService cpsAdminService; private final CpsModuleService cpsModuleService; + private final CpsDataService cpsDataService; private static final String SUBSCRIPTION_DATASPACE_NAME = "NCMP-Admin"; private static final String SUBSCRIPTION_ANCHOR_NAME = "AVC-Subscriptions"; private static final String SUBSCRIPTION_SCHEMASET_NAME = "subscriptions"; + private static final String SUBSCRIPTION_REGISTRY_DATANODE_NAME = "subscription-registry"; @Value("${ncmp.model-loader.subscription:false}") private boolean subscriptionModelLoaderEnabled; @@ -76,6 +80,8 @@ public class SubscriptionModelLoader implements ModelLoader { if (!yangResourceContentMap.get("subscription.yang").isEmpty()) { createSchemaSet(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_SCHEMASET_NAME, yangResourceContentMap); createAnchor(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_SCHEMASET_NAME, SUBSCRIPTION_ANCHOR_NAME); + createTopLevelDataNode(SUBSCRIPTION_DATASPACE_NAME, SUBSCRIPTION_ANCHOR_NAME, + SUBSCRIPTION_REGISTRY_DATANODE_NAME); } } @@ -116,6 +122,20 @@ public class SubscriptionModelLoader implements ModelLoader { return true; } + private void createTopLevelDataNode(final String dataspaceName, + final String anchorName, + final String dataNodeName) { + final String nodeData = "{\"" + dataNodeName + "\":{}}"; + try { + cpsDataService.saveData(dataspaceName, anchorName, nodeData, OffsetDateTime.now()); + } catch (final AlreadyDefinedException exception) { + log.info("Creating new data node {} failed as data node already exists", dataNodeName); + } catch (final Exception exception) { + log.debug("Creating data node for subscription model failed: {}", exception.getMessage()); + throw new NcmpStartUpException("Creating data node failed", exception.getMessage()); + } + } + private String getFileContentAsString() { try (InputStream inputStream = getClass().getClassLoader() .getResourceAsStream("model/subscription.yang")) { |