diff options
Diffstat (limited to 'cps-ncmp-service/src/main')
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java (renamed from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java) | 23 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java | 25 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java | 106 | ||||
-rw-r--r-- | cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java | 5 |
4 files changed, 141 insertions, 18 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java index eda881767d..60d39db7a2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/EventsPublisher.java @@ -18,11 +18,10 @@ * ============LICENSE_END========================================================= */ -package org.onap.cps.ncmp.api.impl.event.lcm; +package org.onap.cps.ncmp.api.impl.event; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; @@ -30,36 +29,36 @@ import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** - * LcmEventsPublisher to publish the LcmEvents on event of CREATE, UPDATE and DELETE. + * EventsPublisher to publish events. */ @Slf4j @Service @RequiredArgsConstructor -public class LcmEventsPublisher { +public class EventsPublisher<T> { - private final KafkaTemplate<String, LcmEvent> lcmEventKafkaTemplate; + private final KafkaTemplate<String, T> eventKafkaTemplate; /** * LCM Event publisher. * * @param topicName valid topic name * @param eventKey message key - * @param lcmEvent message payload + * @param event message payload */ - public void publishEvent(final String topicName, final String eventKey, final LcmEvent lcmEvent) { - final ListenableFuture<SendResult<String, LcmEvent>> lcmEventFuture = - lcmEventKafkaTemplate.send(topicName, eventKey, lcmEvent); + public void publishEvent(final String topicName, final String eventKey, final T event) { + final ListenableFuture<SendResult<String, T>> eventFuture = + eventKafkaTemplate.send(topicName, eventKey, event); - lcmEventFuture.addCallback(new ListenableFutureCallback<>() { + eventFuture.addCallback(new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage()); } @Override - public void onSuccess(final SendResult<String, LcmEvent> sendResult) { - log.debug("Successfully published event to topic : {} , LcmEvent : {}", + public void onSuccess(final SendResult<String, T> sendResult) { + log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } }); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java index 92949cbb79..d08baac5d4 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventConsumer.java @@ -22,7 +22,10 @@ package org.onap.cps.ncmp.api.impl.event.avc; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent; import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -32,6 +35,11 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class SubscriptionEventConsumer { + private final SubscriptionEventForwarder subscriptionEventForwarder; + + @Value("${notification.enabled:true}") + private boolean notificationFeatureEnabled; + /** * Consume the specified event. * @@ -40,12 +48,21 @@ public class SubscriptionEventConsumer { @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}", properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"}) public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { - if ("CM".equals(subscriptionEvent.getEvent().getDataType().getDataCategory())) { + final InnerSubscriptionEvent event = subscriptionEvent.getEvent(); + final String eventDatastore = event.getPredicates().getDatastore(); + if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) { + throw new OperationNotYetSupportedException( + "passthrough datastores are currently only supported for event subscriptions"); + } + if ("CM".equals(event.getDataType().getDataCategory())) { log.debug("Consuming event {} ...", subscriptionEvent); if ("CREATE".equals(subscriptionEvent.getEventType().value())) { - log.info("Subscription for ClientID {} with name{} ...", - subscriptionEvent.getEvent().getSubscription().getClientID(), - subscriptionEvent.getEvent().getSubscription().getName()); + log.info("Subscription for ClientID {} with name {} ...", + event.getSubscription().getClientID(), + event.getSubscription().getName()); + if (notificationFeatureEnabled) { + subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent); + } } } else { log.trace("Non-CM subscription event ignored"); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java new file mode 100644 index 0000000000..635059bfe1 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/avc/SubscriptionEventForwarder.java @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.event.avc; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; +import org.onap.cps.ncmp.api.impl.operations.RequiredDmiService; +import org.onap.cps.ncmp.api.impl.yangmodels.YangModelCmHandle; +import org.onap.cps.ncmp.api.inventory.InventoryPersistence; +import org.onap.cps.ncmp.event.model.SubscriptionEvent; +import org.onap.cps.spi.exceptions.OperationNotYetSupportedException; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +@RequiredArgsConstructor +public class SubscriptionEventForwarder { + + private final InventoryPersistence inventoryPersistence; + private final EventsPublisher<SubscriptionEvent> eventsPublisher; + + private static final String DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX = "ncmp-dmi-cm-avc-subscription-"; + + /** + * Forward subscription event. + * + * @param subscriptionEvent the event to be forwarded + */ + public void forwardCreateSubscriptionEvent(final SubscriptionEvent subscriptionEvent) { + final List<Object> cmHandleTargets = subscriptionEvent.getEvent().getPredicates().getTargets(); + if (cmHandleTargets == null || cmHandleTargets.isEmpty() + || cmHandleTargets.stream().anyMatch(id -> ((String) id).contains("*"))) { + throw new OperationNotYetSupportedException( + "CMHandle targets are required. \"Wildcard\" operations are not yet supported"); + } + final List<String> cmHandleTargetsAsStrings = cmHandleTargets.stream().map( + Objects::toString).collect(Collectors.toList()); + final Collection<YangModelCmHandle> yangModelCmHandles = + inventoryPersistence.getYangModelCmHandles(cmHandleTargetsAsStrings); + final Map<String, Map<String, Map<String, String>>> dmiNameCmHandleMap = + organizeByDmiName(yangModelCmHandles); + dmiNameCmHandleMap.forEach((dmiName, cmHandlePropertiesMap) -> { + subscriptionEvent.getEvent().getPredicates().setTargets(Collections.singletonList(cmHandlePropertiesMap)); + final String eventKey = createEventKey(subscriptionEvent, dmiName); + eventsPublisher.publishEvent(DMI_AVC_SUBSCRIPTION_TOPIC_PREFIX + dmiName, eventKey, subscriptionEvent); + }); + } + + private Map<String, Map<String, Map<String, String>>> organizeByDmiName( + final Collection<YangModelCmHandle> yangModelCmHandles) { + final Map<String, Map<String, Map<String, String>>> dmiNameCmHandlePropertiesMap = new HashMap<>(); + yangModelCmHandles.forEach(cmHandle -> { + final String dmiName = cmHandle.resolveDmiServiceName(RequiredDmiService.DATA); + if (!dmiNameCmHandlePropertiesMap.containsKey(dmiName)) { + final Map<String, Map<String, String>> cmHandleDmiPropertiesMap = new HashMap<>(); + cmHandleDmiPropertiesMap.put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + dmiNameCmHandlePropertiesMap.put(cmHandle.getDmiDataServiceName(), cmHandleDmiPropertiesMap); + } else { + dmiNameCmHandlePropertiesMap.get(cmHandle.getDmiDataServiceName()) + .put(cmHandle.getId(), dmiPropertiesAsMap(cmHandle)); + } + }); + return dmiNameCmHandlePropertiesMap; + } + + private String createEventKey(final SubscriptionEvent subscriptionEvent, final String dmiName) { + return subscriptionEvent.getEvent().getSubscription().getClientID() + + "-" + + subscriptionEvent.getEvent().getSubscription().getName() + + "-" + + dmiName; + } + + public Map<String, String> dmiPropertiesAsMap(final YangModelCmHandle yangModelCmHandle) { + return yangModelCmHandle.getDmiProperties().stream().collect( + Collectors.toMap(YangModelCmHandle.Property::getName, YangModelCmHandle.Property::getValue)); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java index a94d664de9..2eba83053b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/lcm/LcmEventsService.java @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.api.impl.event.lcm; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.impl.event.EventsPublisher; import org.onap.ncmp.cmhandle.event.lcm.LcmEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; @@ -37,7 +38,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class LcmEventsService { - private final LcmEventsPublisher lcmEventsPublisher; + private final EventsPublisher<LcmEvent> eventsPublisher; @Value("${app.lcm.events.topic:ncmp-events}") private String topicName; @@ -56,7 +57,7 @@ public class LcmEventsService { public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) { if (notificationsEnabled) { try { - lcmEventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); + eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent); } catch (final KafkaException e) { log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage()); } |