diff options
Diffstat (limited to 'cps-ncmp-service/src/main/java/org')
8 files changed, 190 insertions, 14 deletions
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java index d00d2119b1..5aad404e61 100755 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/NetworkCmProxyDataServiceImpl.java @@ -93,8 +93,12 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final DmiPluginRegistration dmiPluginRegistration) { dmiPluginRegistration.validateDmiPluginRegistration(); final DmiPluginRegistrationResponse dmiPluginRegistrationResponse = new DmiPluginRegistrationResponse(); - dmiPluginRegistrationResponse.setRemovedCmHandles( - parseAndRemoveCmHandlesInDmiRegistration(dmiPluginRegistration.getRemovedCmHandles())); + + if (!dmiPluginRegistration.getRemovedCmHandles().isEmpty()) { + dmiPluginRegistrationResponse.setRemovedCmHandles( + parseAndRemoveCmHandlesInDmiRegistration(dmiPluginRegistration.getRemovedCmHandles())); + } + if (!dmiPluginRegistration.getCreatedCmHandles().isEmpty()) { dmiPluginRegistrationResponse.setCreatedCmHandles( parseAndCreateCmHandlesInDmiRegistrationAndSyncModules(dmiPluginRegistration)); @@ -321,15 +325,13 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService final List<String> tobeRemovedCmHandles) { final List<CmHandleRegistrationResponse> cmHandleRegistrationResponses = new ArrayList<>(tobeRemovedCmHandles.size()); + + setState(tobeRemovedCmHandles, CmHandleState.DELETING); + for (final String cmHandleId : tobeRemovedCmHandles) { try { - final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId); - lcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, - CmHandleState.DELETING); deleteCmHandleFromDbAndModuleSyncMap(cmHandleId); cmHandleRegistrationResponses.add(CmHandleRegistrationResponse.createSuccessResponse(cmHandleId)); - lcmEventsCmHandleStateHandler.updateCmHandleState(yangModelCmHandle, - CmHandleState.DELETED); } catch (final DataNodeNotFoundException dataNodeNotFoundException) { log.error("Unable to find dataNode for cmHandleId : {} , caused by : {}", cmHandleId, dataNodeNotFoundException.getMessage()); @@ -347,9 +349,22 @@ public class NetworkCmProxyDataServiceImpl implements NetworkCmProxyDataService CmHandleRegistrationResponse.createFailureResponse(cmHandleId, exception)); } } + + setState(tobeRemovedCmHandles, CmHandleState.DELETED); + return cmHandleRegistrationResponses; } + private void setState(final List<String> tobeRemovedCmHandles, final CmHandleState cmHandleState) { + final Map<YangModelCmHandle, CmHandleState> cmHandleIdsToBeRemoved = new HashMap<>(); + for (final String cmHandleId : tobeRemovedCmHandles) { + cmHandleIdsToBeRemoved.put( + inventoryPersistence.getYangModelCmHandle(cmHandleId), + cmHandleState); + } + lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleIdsToBeRemoved); + } + private void deleteCmHandleFromDbAndModuleSyncMap(final String cmHandleId) { inventoryPersistence.deleteSchemaSetWithCascade(cmHandleId); inventoryPersistence.deleteListOrListElement("/dmi-registry/cm-handles[@id='" + cmHandleId + "']"); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java index a9e7164fd7..bc6624dee6 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (c) 2022 Nordix Foundation. + * Copyright (c) 2023 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,9 @@ public class NcmpAsyncRequestResponseEventConsumer { * * @param dmiAsyncRequestResponseEvent the event to be consumed and produced. */ - @KafkaListener(topics = "${app.ncmp.async-m2m.topic}") + @KafkaListener( + topics = "${app.ncmp.async-m2m.topic}", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent"}) public void consumeAndForward(final DmiAsyncRequestResponseEvent dmiAsyncRequestResponseEvent) { log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java index 5154be7990..5c3cb60c28 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/config/embeddedcache/SynchronizationCacheConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================== - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2022-2023 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ import org.springframework.context.annotation.Configuration; @Configuration public class SynchronizationCacheConfig { - public static final int MODULE_SYNC_STARTED_TTL_SECS = 60; + public static final int MODULE_SYNC_STARTED_TTL_SECS = 120; public static final int DATA_SYNC_SEMAPHORE_TTL_SECS = 1800; private static final QueueConfig commonQueueConfig = createQueueConfig(); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventConsumer.java new file mode 100644 index 0000000000..79a36bf506 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventConsumer.java @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (c) 2023 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.notifications.avc; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.AvcEvent; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * Listener for AVC events. + */ +@Component +@Slf4j +@RequiredArgsConstructor +@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) +public class AvcEventConsumer { + + private final AvcEventProducer avcEventProducer; + + /** + * Consume the specified event. + * + * @param avcEvent the event to be consumed and produced. + */ + @KafkaListener( + topics = "dmi-cm-events", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.AvcEvent"}) + public void consumeAndForward(final AvcEvent avcEvent) { + log.debug("Consuming AVC event {} ...", avcEvent); + avcEventProducer.sendMessage(avcEvent); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventMapper.java new file mode 100644 index 0000000000..531de46414 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventMapper.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.notifications.avc; + +import java.util.UUID; +import org.mapstruct.Mapper; +import org.mapstruct.Mapping; +import org.mapstruct.Named; +import org.onap.cps.ncmp.event.model.AvcEvent; + + +/** + * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. + */ +@Mapper(componentModel = "spring") +public interface AvcEventMapper { + + @Mapping(source = "eventTime", target = "eventTime") + @Mapping(source = "eventId", target = "eventId", qualifiedByName = "avcEventId") + @Mapping(source = "eventCorrelationId", target = "eventCorrelationId") + @Mapping(source = "eventSchema", target = "eventSchema") + @Mapping(source = "eventSchemaVersion", target = "eventSchemaVersion") + @Mapping(source = "eventTarget", target = "eventTarget") + @Mapping(source = "eventType", target = "eventType") + AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); + + @Named("avcEventId") + static String getAvcEventId(String eventId) { + return UUID.randomUUID().toString(); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducer.java new file mode 100644 index 0000000000..049f66100b --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/notifications/avc/AvcEventProducer.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.api.impl.notifications.avc; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.event.model.AvcEvent; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +/** + * Producer for AVC events. + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class AvcEventProducer { + + private final KafkaTemplate<String, AvcEvent> kafkaTemplate; + + private final AvcEventMapper avcEventMapper; + + /** + * Sends message to the configured topic with a message key. + * + * @param incomingAvcEvent message payload + */ + public void sendMessage(final AvcEvent incomingAvcEvent) { + // generate new event id while keeping other data + final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(incomingAvcEvent); + log.debug("Forwarding AVC event {} to topic {} ", outgoingAvcEvent.getEventId(), "cm-events"); + kafkaTemplate.send("cm-events", outgoingAvcEvent.getEventId(), outgoingAvcEvent); + } +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java index 004ef289ac..3fbebe0771 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncTasks.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2022-2023 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -97,6 +97,7 @@ public class ModuleSyncTasks { for (final YangModelCmHandle failedCmHandle : failedCmHandles) { final CompositeState compositeState = failedCmHandle.getCompositeState(); final boolean isReadyForRetry = syncUtils.isReadyForRetry(compositeState); + log.info("Retry for cmHandleId : {} is {}", failedCmHandle.getId(), isReadyForRetry); if (isReadyForRetry) { final String resetCmHandleId = failedCmHandle.getId(); log.debug("Reset cm handle {} state to ADVISED to be re-attempted by module-sync watchdog", @@ -115,7 +116,7 @@ public class ModuleSyncTasks { private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) { if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) { - log.debug("{} removed from in progress map", resetCmHandleId); + log.info("{} removed from in progress map", resetCmHandleId); } } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java index f629b71d26..8acaa0abe3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/inventory/sync/ModuleSyncWatchdog.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2022-2023 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -88,11 +88,13 @@ public class ModuleSyncWatchdog { public void resetPreviouslyFailedCmHandles() { log.info("Processing module sync retry-watchdog waking up."); final List<YangModelCmHandle> failedCmHandles = syncUtils.getModuleSyncFailedCmHandles(); + log.info("Retrying {} cmHandles", failedCmHandles.size()); moduleSyncTasks.resetFailedCmHandles(failedCmHandles); } private void preventBusyWait() { try { + log.info("Busy waiting now"); TimeUnit.MILLISECONDS.sleep(PREVENT_CPU_BURN_WAIT_TIME_MILLIS); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); @@ -108,6 +110,7 @@ public class ModuleSyncWatchdog { log.warn("Unable to add cm handle {} to the work queue", advisedCmHandle.getLeaves().get("id")); } } + log.info("Work Queue Size : {}", moduleSyncWorkQueue.size()); } } |