From 7549776fb3b82de2d14ae60e1bddf74bf62f8f79 Mon Sep 17 00:00:00 2001 From: lukegleeson Date: Thu, 23 Feb 2023 11:43:13 +0000 Subject: Forward Subscription Information to DMI Plugin(s) Note: Implemented common EventsPublisher class Issue-ID: CPS-1431 Signed-off-by: lukegleeson Change-Id: I292a95f2c990a140f5fd63622ca4eba3f8284b9e --- .../cps/ncmp/api/impl/event/EventsPublisher.java | 66 +++++++++++++ .../impl/event/avc/SubscriptionEventConsumer.java | 25 ++++- .../impl/event/avc/SubscriptionEventForwarder.java | 106 +++++++++++++++++++++ .../api/impl/event/lcm/LcmEventsPublisher.java | 67 ------------- .../ncmp/api/impl/event/lcm/LcmEventsService.java | 5 +- 5 files changed, 196 insertions(+), 73 deletions(-) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java (limited to 'cps-ncmp-service/src/main/java/org') diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java new file mode 100644 index 0000000000..60d39db7a2 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +/** + * EventsPublisher to publish events. + */ + +@Slf4j +@Service +@RequiredArgsConstructor +public class EventsPublisher { + + private final KafkaTemplate eventKafkaTemplate; + + /** + * LCM Event publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishEvent(final String topicName, final String eventKey, final T event) { + final ListenableFuture> eventFuture = + eventKafkaTemplate.send(topicName, eventKey, event); + + eventFuture.addCallback(new ListenableFutureCallback<>() { + @Override + public void onFailure(final Throwable throwable) { + log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); + } + + @Override + public void onSuccess(final SendResult sendResult) { + log.debug("Successfully published event to topic : {} , Event : {}", + sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); + } + }); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java index 92949cbb79..d08baac5d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java @@ -22,7 +22,10 @@ package org.onap.cps.ncmp.api.impl.event.avc; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -32,6 +35,11 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class SubscriptionEventConsumer { + private final SubscriptionEventForwarder subscriptionEventForwarder; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + /** * Consume the specified event. * @@ -40,12 +48,21 @@ public class SubscriptionEventConsumer { @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { - if ("CM".equals(subscriptionEvent.getEvent().getDataType().getDataCategory())) { + final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); + final String eventDatastore = event.getPredicates().getDatastore(); + if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { + throw new OperationNotYetSupportedException( + "passthrough datastores are currently only supported for event subscriptions"); + } + if ("CM".equals(event.getDataType().getDataCategory())) { log.debug("Consuming event {} ...", subscriptionEvent); if ("CREATE".equals(subscriptionEvent.getEventType().value())) { - log.info("Subscription for ClientID {} with name{} ...", - subscriptionEvent.getEvent().getSubscription().getClientID(), - subscriptionEvent.getEvent().getSubscription().getName()); + log.info("Subscription for ClientID {} with name {} ...", + event.getSubscription().getClientID(), + event.getSubscription().getName()); + if (notificationFeatureEnabled) { + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + } } } else { log.trace("Non-CM subscription event ignored"); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java new file mode 100644 index 0000000000..635059bfe1 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; +import org.onap.cps.ncmp.api.impl.operations.RequiredDmiService; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventForwarder { + + private final InventoryPersistence inventoryPersistence; + private final EventsPublisher eventsPublisher; + + private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + + /** + * Forward subscription event. + * + * @param subscriptionEvent the event to be forwarded + */ + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + final List cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); + if (cmHandleTargets == null || cmHandleTargets.isEmpty() + || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { + throw new OperationNotYetSupportedException( + "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); + } + final List cmHandleTargetsAsStrings = cmHandleTargets.stream().map( + Objects::toString).collect(Collectors.toList()); + final Collection yangModelCmHandles = + inventoryPersistence.getYangModelCmHandles(cmHandleTargetsAsStrings); + final Map>> dmiNameCmHandleMap = + organizeByDmiName(yangModelCmHandles); + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); + final String eventKey = createEventKey(subscriptionEvent, dmiName); + eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); + }); + } + + private Map>> organizeByDmiName( + final Collection yangModelCmHandles) { + final Map>> dmiNameCmHandlePropertiesMap = new HashMap<>(); + yangModelCmHandles.forEach(cmHandle -> { + final String dmiName = cmHandle.resolveDmiServiceName(RequiredDmiService.DATA); + if (!dmiNameCmHandlePropertiesMap.containsKey(dmiName)) { + final Map> cmHandleDmiPropertiesMap = new HashMap<>(); + cmHandleDmiPropertiesMap.put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + dmiNameCmHandlePropertiesMap.put(cmHandle.getDmiDataServiceName(), cmHandleDmiPropertiesMap); + } else { + dmiNameCmHandlePropertiesMap.get(cmHandle.getDmiDataServiceName()) + .put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + } + }); + return dmiNameCmHandlePropertiesMap; + } + + private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { + return subscriptionEvent.getEvent().getSubscription().getClientID() + + "-" + + subscriptionEvent.getEvent().getSubscription().getName() + + "-" + + dmiName; + } + + public Map dmiPropertiesAsMap(final YangModelCmHandle yangModelCmHandle) { + return yangModelCmHandle.getDmiProperties().stream().collect( + Collectors.toMap(YangModelCmHandle.Property::getName, YangModelCmHandle.Property::getValue)); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java deleted file mode 100644 index eda881767d..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.event.lcm; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import org.springframework.stereotype.Service; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; - -/** - * LcmEventsPublisher to publish the LcmEvents on event of CREATE, UPDATE and DELETE. - */ - -@Slf4j -@Service -@RequiredArgsConstructor -public class LcmEventsPublisher { - - private final KafkaTemplate lcmEventKafkaTemplate; - - /** - * LCM Event publisher. - * - * @param topicName valid topic name - * @param eventKey message key - * @param lcmEvent message payload - */ - public void publishEvent(final String topicName, final String eventKey, final LcmEvent lcmEvent) { - final ListenableFuture> lcmEventFuture = - lcmEventKafkaTemplate.send(topicName, eventKey, lcmEvent); - - lcmEventFuture.addCallback(new ListenableFutureCallback<>() { - @Override - public void onFailure(final Throwable throwable) { - log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); - } - - @Override - public void onSuccess(final SendResult sendResult) { - log.debug("Successfully published event to topic : {} , LcmEvent : {}", - sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); - } - }); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java index a94d664de9..2eba83053b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.api.impl.event.lcm; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; @@ -37,7 +38,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class LcmEventsService { - private final LcmEventsPublisher lcmEventsPublisher; + private final EventsPublisher eventsPublisher; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -56,7 +57,7 @@ public class LcmEventsService { public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) { if (notificationsEnabled) { try { - lcmEventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); + eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } -- cgit 1.2.3-korg